Skip to content

Commit

Permalink
Add watchers for Peering Token secrets for the peering controllers
Browse files Browse the repository at this point in the history
- When a Kubernetes secret that has a label indicating it is a peering token secret, the controllers watch those secrets and updated to those secrets re-enqueues the resource that is associated with that peering token secret.

The status is used to determine the Peering Acceptor that is re-enqueued
while the spec is used to determing the Peering Dialer that gets
re-enqueued. This is because the acceptor is responsible for creating
the secret and hence metaphorically owns the secret described in it's
status. OTOH the dialer should respond to changes in the secret
described in it's spec.

This is only supported for secrets with a Kubernetes backend.
  • Loading branch information
thisisnotashwin committed Jun 15, 2022
1 parent 0a43723 commit ff2dd5e
Show file tree
Hide file tree
Showing 5 changed files with 533 additions and 2 deletions.
4 changes: 4 additions & 0 deletions control-plane/connect-inject/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ const (
// registered with Consul.
labelServiceIgnore = "consul.hashicorp.com/service-ignore"

// labelPeeringToken is a label that can be added to a secret to allow it to be watched
// by the peering controllers.
labelPeeringToken = "consul.hashicorp.com/peering-token"

// injected is used as the annotation value for annotationInjected.
injected = "injected"

Expand Down
55 changes: 54 additions & 1 deletion control-plane/connect-inject/peering_acceptor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// PeeringAcceptorController reconciles a PeeringAcceptor object.
Expand Down Expand Up @@ -318,7 +323,11 @@ func (r *PeeringAcceptorController) deleteK8sSecret(ctx context.Context, accepto
func (r *PeeringAcceptorController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&consulv1alpha1.PeeringAcceptor{}).
Complete(r)
Watches(
&source.Kind{Type: &corev1.Secret{}},
handler.EnqueueRequestsFromMapFunc(r.requestsForPeeringTokens),
builder.WithPredicates(predicate.NewPredicateFuncs(r.filterPeeringAcceptors)),
).Complete(r)
}

// generateToken is a helper function that calls the Consul api to generate a token for the peer.
Expand All @@ -344,12 +353,56 @@ func (r *PeeringAcceptorController) deletePeering(ctx context.Context, peerName
return nil
}

// requestsForPeeringTokens creates a slice of requests for the peering acceptor controller.
// It enqueues a request for each acceptor that needs to be reconciled. It iterates through
// the list of endpoints and creates a request for those endpoints that have an address that
// are on the same node as the new Consul Agent pod. It receives a Pod Object which is a
// Consul Agent that has been filtered by filterAgentPods and only enqueues endpoints
// for client agent pods where the Ready condition is true.
func (r *PeeringAcceptorController) requestsForPeeringTokens(object client.Object) []reconcile.Request {
r.Log.Info("received update for Peering Token Secret", "name", object.GetName())

// Get the list of all acceptors.
var acceptorList consulv1alpha1.PeeringAcceptorList
if err := r.Client.List(r.Context, &acceptorList); err != nil {
r.Log.Error(err, "failed to list endpoints")
return []ctrl.Request{}
}
for _, acceptor := range acceptorList.Items {
if acceptor.SecretRef().Backend == "kubernetes" {
if acceptor.SecretRef().Name == object.GetName() && acceptor.Namespace == object.GetNamespace() {
return []ctrl.Request{{NamespacedName: types.NamespacedName{Namespace: acceptor.Namespace, Name: acceptor.Name}}}
}
}
}
return []ctrl.Request{}
}

// filterPeeringAcceptors receives meta and object information for Kubernetes resources that are being watched,
// which in this case are Secrets. It only returns true if the Secret is a Peering Token Secret. It reads the labels
// from the meta of the resource and uses the values of the "consul.hashicorp.com/peering-token" label to validate that
// the Secret is a Peering Token Secret.
func (r *PeeringAcceptorController) filterPeeringAcceptors(object client.Object) bool {
secretLabels := object.GetLabels()
isPeeringToken, ok := secretLabels[labelPeeringToken]
if !ok {
return false
}
if isPeeringToken == "true" {
return true
}
return false
}

// createSecret is a helper function that creates a corev1.Secret when provided inputs.
func createSecret(name, namespace, key, value string) *corev1.Secret {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
labelPeeringToken: "true",
},
},
Data: map[string][]byte{
key: []byte(value),
Expand Down
218 changes: 218 additions & 0 deletions control-plane/connect-inject/peering_acceptor_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// TestReconcileCreateUpdatePeeringAcceptor creates a peering acceptor.
Expand Down Expand Up @@ -396,6 +397,8 @@ func TestReconcileCreateUpdatePeeringAcceptor(t *testing.T) {
require.NoError(t, err)
expSecrets := tt.expectedK8sSecrets()
require.Equal(t, expSecrets[0].Name, createdSecret.Name)
require.Contains(t, createdSecret.Labels, labelPeeringToken)
require.Equal(t, createdSecret.Labels[labelPeeringToken], "true")
// This assertion needs to be on StringData rather than Data because in the fake K8s client the contents are
// stored in StringData if that's how the secret was initialized in the fake client. In a real cluster, this
// StringData is an input only field, and shouldn't be read from.
Expand Down Expand Up @@ -943,3 +946,218 @@ func TestAcceptorUpdateStatusError(t *testing.T) {
})
}
}

func TestAcceptor_FilterPeeringAcceptor(t *testing.T) {
t.Parallel()
cases := map[string]struct {
secret *corev1.Secret
result bool
}{
"returns true if label is set to true": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
Labels: map[string]string{
labelPeeringToken: "true",
},
},
},
result: true,
},
"returns false if label is set to false": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
Labels: map[string]string{
labelPeeringToken: "false",
},
},
},
result: false,
},
"returns false if label is set to a non-true value": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
Labels: map[string]string{
labelPeeringToken: "foo",
},
},
},
result: false,
},
"returns false if label is not set": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
},
result: false,
},
}

for name, tt := range cases {
t.Run(name, func(t *testing.T) {
controller := PeeringAcceptorController{}
result := controller.filterPeeringAcceptors(tt.secret)
require.Equal(t, tt.result, result)
})
}
}

func TestAcceptor_RequestsForPeeringTokens(t *testing.T) {
t.Parallel()
cases := map[string]struct {
secret *corev1.Secret
acceptors v1alpha1.PeeringAcceptorList
result []reconcile.Request
}{
"secret matches existing acceptor": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
},
acceptors: v1alpha1.PeeringAcceptorList{
Items: []v1alpha1.PeeringAcceptor{
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering",
Namespace: "test",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test",
Key: "test",
Backend: "kubernetes",
},
},
},
},
},
},
result: []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: "test",
Name: "peering",
},
},
},
},
"does not match if backend is not kubernetes": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
},
acceptors: v1alpha1.PeeringAcceptorList{
Items: []v1alpha1.PeeringAcceptor{
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering",
Namespace: "test",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test",
Key: "test",
Backend: "vault",
},
},
},
},
},
},
result: []reconcile.Request{},
},
"only matches with the correct acceptor": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
},
acceptors: v1alpha1.PeeringAcceptorList{
Items: []v1alpha1.PeeringAcceptor{
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering-1",
Namespace: "test",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test",
Key: "test",
Backend: "kubernetes",
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering-2",
Namespace: "test-2",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test",
Key: "test",
Backend: "kubernetes",
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering-3",
Namespace: "test",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test-2",
Key: "test",
Backend: "kubernetes",
},
},
},
},
},
},
result: []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: "test",
Name: "peering-1",
},
},
},
},
}

for name, tt := range cases {
t.Run(name, func(t *testing.T) {
s := scheme.Scheme
s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.PeeringAcceptor{}, &v1alpha1.PeeringAcceptorList{})
fakeClient := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(tt.secret, &tt.acceptors).Build()
controller := PeeringAcceptorController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
}
result := controller.requestsForPeeringTokens(tt.secret)

require.Equal(t, tt.result, result)
})
}
}
42 changes: 41 additions & 1 deletion control-plane/connect-inject/peering_dialer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// PeeringDialerController reconciles a PeeringDialer object.
Expand Down Expand Up @@ -231,7 +236,11 @@ func (r *PeeringDialerController) getSecret(ctx context.Context, name string, na
func (r *PeeringDialerController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&consulv1alpha1.PeeringDialer{}).
Complete(r)
Watches(
&source.Kind{Type: &corev1.Secret{}},
handler.EnqueueRequestsFromMapFunc(r.requestsForPeeringTokens),
builder.WithPredicates(predicate.NewPredicateFuncs(r.filterPeeringDialers)),
).Complete(r)
}

// establishPeering is a helper function that calls the Consul api to generate a token for the peer.
Expand All @@ -257,3 +266,34 @@ func (r *PeeringDialerController) deletePeering(ctx context.Context, peerName st
}
return nil
}

func (r *PeeringDialerController) requestsForPeeringTokens(object client.Object) []reconcile.Request {
r.Log.Info("received update for Peering Token Secret", "name", object.GetName())

// Get the list of all dialers.
var dialerList consulv1alpha1.PeeringDialerList
if err := r.Client.List(r.Context, &dialerList); err != nil {
r.Log.Error(err, "failed to list endpoints")
return []ctrl.Request{}
}
for _, dialer := range dialerList.Items {
if dialer.Secret().Backend == "kubernetes" {
if dialer.Secret().Name == object.GetName() && dialer.Namespace == object.GetNamespace() {
return []ctrl.Request{{NamespacedName: types.NamespacedName{Namespace: dialer.Namespace, Name: dialer.Name}}}
}
}
}
return []ctrl.Request{}
}

func (r *PeeringDialerController) filterPeeringDialers(object client.Object) bool {
secretLabels := object.GetLabels()
isPeeringToken, ok := secretLabels[labelPeeringToken]
if !ok {
return false
}
if isPeeringToken == "true" {
return true
}
return false
}
Loading

0 comments on commit ff2dd5e

Please sign in to comment.