diff --git a/control-plane/api/v1alpha1/registration_types.go b/control-plane/api/v1alpha1/registration_types.go index 9ca58e7c23..c253f25aa6 100644 --- a/control-plane/api/v1alpha1/registration_types.go +++ b/control-plane/api/v1alpha1/registration_types.go @@ -13,6 +13,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) func init() { @@ -289,6 +290,13 @@ func (r *Registration) ToCatalogDeregistration() *capi.CatalogDeregistration { } } +func (r *Registration) NamespacedName() types.NamespacedName { + return types.NamespacedName{ + Namespace: r.Namespace, + Name: r.Name, + } +} + // SetSyncedCondition sets the synced condition on the Registration. func (r *Registration) SetSyncedCondition(status corev1.ConditionStatus, reason string, message string) { r.Status.Conditions = Conditions{ diff --git a/control-plane/controllers/configentries/registrations_controller.go b/control-plane/controllers/configentries/registrations_controller.go index c9137ff63a..86513ca909 100644 --- a/control-plane/controllers/configentries/registrations_controller.go +++ b/control-plane/controllers/configentries/registrations_controller.go @@ -5,6 +5,9 @@ package configentries import ( "context" + "fmt" + "slices" + "strings" "time" "github.com/go-logr/logr" @@ -15,8 +18,11 @@ import ( "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" capi "github.com/hashicorp/consul/api" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" "github.com/hashicorp/consul-k8s/control-plane/consul" @@ -24,14 +30,21 @@ import ( const RegistrationFinalizer = "registration.finalizers.consul.hashicorp.com" +// Status Reasons. +const ( + ConsulErrorRegistration = "ConsulErrorRegistration" + ConsulErrorDeregistration = "ConsulErrorDeregistration" + ConsulErrorACL = "ConsulErrorACL" +) + // RegistrationsController is the controller for Registrations resources. type RegistrationsController struct { client.Client FinalizerPatcher - Log logr.Logger Scheme *runtime.Scheme ConsulClientConfig *consul.Config ConsulServerConnMgr consul.ServerConnectionManager + Log logr.Logger } // +kubebuilder:rbac:groups=consul.hashicorp.com,resources=servicerouters,verbs=get;list;watch;create;update;patch;delete @@ -58,42 +71,50 @@ func (r *RegistrationsController) Reconcile(ctx context.Context, req ctrl.Reques // deletion request if !registration.ObjectMeta.DeletionTimestamp.IsZero() { - log.Info("Deregistering service") - err = r.deregisterService(ctx, log, client, registration) + err := r.handleDeletion(ctx, log, client, registration) if err != nil { - r.updateStatusError(ctx, registration, "ConsulErrorDeregistration", err) return ctrl.Result{}, err } - err := r.updateStatus(ctx, req.NamespacedName) - if err != nil { - log.Error(err, "failed to update status") - } - return ctrl.Result{}, nil } - log.Info("Registering service") - err = r.registerService(ctx, log, client, registration) + // registration request + err = r.handleRegistration(ctx, log, client, registration) if err != nil { - r.updateStatusError(ctx, registration, "ConsulErrorRegistration", err) return ctrl.Result{}, err } - - err = r.updateStatus(ctx, req.NamespacedName) - if err != nil { - log.Error(err, "failed to update status") - } - return ctrl.Result{}, err + return ctrl.Result{}, nil } -func (r *RegistrationsController) registerService(ctx context.Context, log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { - patch := r.AddFinalizersPatch(registration, RegistrationFinalizer) +func (r *RegistrationsController) handleRegistration(ctx context.Context, log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { + log.Info("Registering service") + patch := r.AddFinalizersPatch(registration, RegistrationFinalizer) err := r.Patch(ctx, registration, patch) if err != nil { return err } + err = r.registerService(log, client, registration) + if err != nil { + r.updateStatusError(ctx, log, registration, ConsulErrorRegistration, err) + return err + } + if r.ConsulClientConfig.APIClientConfig.Token != "" || r.ConsulClientConfig.APIClientConfig.TokenFile != "" { + err = r.updateTermGWACLRole(ctx, log, client, registration) + if err != nil { + r.updateStatusError(ctx, log, registration, ConsulErrorACL, err) + return err + } + } + err = r.updateStatus(ctx, log, registration.NamespacedName()) + if err != nil { + return err + } + return nil +} + +func (r *RegistrationsController) registerService(log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { regReq, err := registration.ToCatalogRegistration() if err != nil { return err @@ -109,39 +130,217 @@ func (r *RegistrationsController) registerService(ctx context.Context, log logr. return nil } -func (r *RegistrationsController) deregisterService(ctx context.Context, log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { - deRegReq := registration.ToCatalogDeregistration() +func termGWContainsService(registration *v1alpha1.Registration) func(v1alpha1.LinkedService) bool { + return func(svc v1alpha1.LinkedService) bool { + return svc.Name == registration.Spec.Service.Name + } +} - _, err := client.Catalog().Deregister(deRegReq, nil) +func (r *RegistrationsController) updateTermGWACLRole(ctx context.Context, log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { + termGWList := &v1alpha1.TerminatingGatewayList{} + err := r.Client.List(ctx, termGWList) if err != nil { - log.Error(err, "error deregistering service", "svcID", deRegReq.ServiceID) + log.Error(err, "error listing terminating gateways") return err } + termGWsToUpdate := make([]v1alpha1.TerminatingGateway, 0, len(termGWList.Items)) + for _, termGW := range termGWList.Items { + if slices.ContainsFunc(termGW.Spec.Services, termGWContainsService(registration)) { + termGWsToUpdate = append(termGWsToUpdate, termGW) + } + } + + if len(termGWsToUpdate) == 0 { + log.Info("terminating gateway not found") + return nil + } + + roles, _, err := client.ACL().RoleList(nil) + if err != nil { + log.Error(err, "error reading role list") + return err + } + + policy := &capi.ACLPolicy{ + Name: servicePolicyName(registration.Spec.Service.Name), + Description: "Write policy for terminating gateways for external service", + Rules: fmt.Sprintf(`service %q { policy = "write" }`, registration.Spec.Service.Name), + Datacenters: []string{registration.Spec.Datacenter}, + Namespace: registration.Spec.Service.Namespace, + Partition: registration.Spec.Service.Partition, + } + + existingPolicy, _, err := client.ACL().PolicyReadByName(policy.Name, nil) + if err != nil { + log.Error(err, "error reading policy") + return err + } + + if existingPolicy == nil { + policy, _, err = client.ACL().PolicyCreate(policy, nil) + if err != nil { + return fmt.Errorf("error creating policy: %w", err) + } + } else { + policy = existingPolicy + } + + mErr := &multierror.Error{} + + for _, termGW := range termGWsToUpdate { + var role *capi.ACLRole + for _, r := range roles { + if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) { + role = r + break + } + } + + if role == nil { + log.Info("terminating gateway role not found", "terminatingGatewayName", termGW.Name) + mErr = multierror.Append(mErr, fmt.Errorf("terminating gateway role not found for %q", termGW.Name)) + continue + } + + role.Policies = append(role.Policies, &capi.ACLRolePolicyLink{Name: policy.Name, ID: policy.ID}) + + _, _, err = client.ACL().RoleUpdate(role, nil) + if err != nil { + log.Error(err, "error updating role", "roleName", role.Name) + mErr = multierror.Append(mErr, fmt.Errorf("error updating role %q", role.Name)) + continue + } + } + + return mErr.ErrorOrNil() +} + +func (r *RegistrationsController) handleDeletion(ctx context.Context, log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { + log.Info("Deregistering service") + err := r.deregisterService(log, client, registration) + if err != nil { + r.updateStatusError(ctx, log, registration, ConsulErrorDeregistration, err) + return err + } + if r.ConsulClientConfig.APIClientConfig.Token != "" || r.ConsulClientConfig.APIClientConfig.TokenFile != "" { + err = r.removeTermGWACLRole(ctx, log, client, registration) + if err != nil { + r.updateStatusError(ctx, log, registration, ConsulErrorACL, err) + return err + } + } patch := r.RemoveFinalizersPatch(registration, RegistrationFinalizer) + err = r.Patch(ctx, registration, patch) + if err != nil { + return err + } + + return nil +} - if err := r.Patch(ctx, registration, patch); err != nil { +func (r *RegistrationsController) deregisterService(log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { + deRegReq := registration.ToCatalogDeregistration() + + _, err := client.Catalog().Deregister(deRegReq, nil) + if err != nil { + log.Error(err, "error deregistering service", "svcID", deRegReq.ServiceID) return err } + log.Info("Successfully deregistered service", "svcID", deRegReq.ServiceID) return nil } -func (r *RegistrationsController) Logger(name types.NamespacedName) logr.Logger { - return r.Log.WithValues("request", name) +func (r *RegistrationsController) removeTermGWACLRole(ctx context.Context, log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { + termGWList := &v1alpha1.TerminatingGatewayList{} + err := r.Client.List(ctx, termGWList) + if err != nil { + return err + } + + termGWsToUpdate := make([]v1alpha1.TerminatingGateway, 0, len(termGWList.Items)) + for _, termGW := range termGWList.Items { + if slices.ContainsFunc(termGW.Spec.Services, termGWContainsService(registration)) { + termGWsToUpdate = append(termGWsToUpdate, termGW) + } + } + + if len(termGWsToUpdate) == 0 { + log.Info("terminating gateway not found") + return nil + } + + roles, _, err := client.ACL().RoleList(nil) + if err != nil { + return err + } + + mErr := &multierror.Error{} + for _, termGW := range termGWsToUpdate { + var role *capi.ACLRole + for _, r := range roles { + if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) { + role = r + break + } + } + + if role == nil { + log.Info("terminating gateway role not found", "terminatingGatewayName", termGW.Name) + mErr = multierror.Append(mErr, fmt.Errorf("terminating gateway role not found for %q", termGW.Name)) + continue + } + + var policyID string + + expectedPolicyName := servicePolicyName(registration.Spec.Service.Name) + role.Policies = slices.DeleteFunc(role.Policies, func(i *capi.ACLRolePolicyLink) bool { + if i.Name == expectedPolicyName { + policyID = i.ID + return true + } + return false + }) + + if policyID == "" { + log.Info("policy not found on terminating gateway role", "policyName", expectedPolicyName, "terminatingGatewayName", termGW.Name) + continue + } + + _, _, err = client.ACL().RoleUpdate(role, nil) + if err != nil { + log.Error(err, "error updating role", "roleName", role.Name) + mErr = multierror.Append(mErr, fmt.Errorf("error updating role %q", role.Name)) + continue + } + + _, err = client.ACL().PolicyDelete(policyID, nil) + if err != nil { + log.Error(err, "error deleting service policy", "policyID", policyID, "policyName", expectedPolicyName) + mErr = multierror.Append(mErr, fmt.Errorf("error deleting service ACL policy %q", policyID)) + continue + } + } + + return mErr.ErrorOrNil() } -func (r *RegistrationsController) updateStatusError(ctx context.Context, registration *v1alpha1.Registration, reason string, reconcileErr error) { +func servicePolicyName(name string) string { + return fmt.Sprintf("%s-write-policy", name) +} + +func (r *RegistrationsController) updateStatusError(ctx context.Context, log logr.Logger, registration *v1alpha1.Registration, reason string, reconcileErr error) { registration.SetSyncedCondition(corev1.ConditionFalse, reason, reconcileErr.Error()) registration.Status.LastSyncedTime = &metav1.Time{Time: time.Now()} err := r.Status().Update(ctx, registration) if err != nil { - r.Log.Error(err, "failed to update Registration status", "name", registration.Name, "namespace", registration.Namespace) + log.Error(err, "failed to update Registration status", "name", registration.Name, "namespace", registration.Namespace) } } -func (r *RegistrationsController) updateStatus(ctx context.Context, req types.NamespacedName) error { +func (r *RegistrationsController) updateStatus(ctx context.Context, log logr.Logger, req types.NamespacedName) error { registration := &v1alpha1.Registration{} err := r.Get(ctx, req, registration) @@ -154,12 +353,56 @@ func (r *RegistrationsController) updateStatus(ctx context.Context, req types.Na err = r.Status().Update(ctx, registration) if err != nil { - r.Log.Error(err, "failed to update Registration status", "name", registration.Name, "namespace", registration.Namespace) + log.Error(err, "failed to update Registration status", "name", registration.Name, "namespace", registration.Namespace) return err } return nil } -func (r *RegistrationsController) SetupWithManager(mgr ctrl.Manager) error { - return setupWithManager(mgr, &v1alpha1.Registration{}, r) +func (r *RegistrationsController) Logger(name types.NamespacedName) logr.Logger { + return r.Log.WithValues("request", name) +} + +const registrationByServiceNameIndex = "registrationName" + +func (r *RegistrationsController) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { + // setup the index to lookup registrations by service name + if err := mgr.GetFieldIndexer().IndexField(ctx, &v1alpha1.Registration{}, registrationByServiceNameIndex, indexerFn); err != nil { + return err + } + + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.Registration{}). + Watches(&v1alpha1.TerminatingGateway{}, handler.EnqueueRequestsFromMapFunc(r.transformTerminatingGateways)). + Complete(r) +} + +func indexerFn(o client.Object) []string { + reg := o.(*v1alpha1.Registration) + return []string{reg.Spec.Service.Name} +} + +func (r *RegistrationsController) transformTerminatingGateways(ctx context.Context, o client.Object) []reconcile.Request { + termGW := o.(*v1alpha1.TerminatingGateway) + reqs := make([]reconcile.Request, 0, len(termGW.Spec.Services)) + for _, svc := range termGW.Spec.Services { + // lookup registrationList by service name and add it to the reconcile request + registrationList := &v1alpha1.RegistrationList{} + + err := r.Client.List(ctx, registrationList, client.MatchingFields{registrationByServiceNameIndex: svc.Name}) + if err != nil { + r.Log.Error(err, "error listing registrations by service name", "serviceName", svc.Name) + continue + } + + for _, reg := range registrationList.Items { + reqs = append(reqs, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: reg.Name, + Namespace: reg.Namespace, + }, + }) + } + } + return reqs } diff --git a/control-plane/controllers/configentries/registrations_controller_test.go b/control-plane/controllers/configentries/registrations_controller_test.go index 5e2b2721fa..805be4a014 100644 --- a/control-plane/controllers/configentries/registrations_controller_test.go +++ b/control-plane/controllers/configentries/registrations_controller_test.go @@ -5,6 +5,8 @@ package configentries_test import ( "context" + "encoding/json" + "fmt" "net/http" "net/http/httptest" "net/url" @@ -24,6 +26,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" capi "github.com/hashicorp/consul/api" + "github.com/hashicorp/go-uuid" "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" "github.com/hashicorp/consul-k8s/control-plane/consul" @@ -31,13 +34,124 @@ import ( "github.com/hashicorp/consul-k8s/control-plane/helper/test" ) +type serverResponseConfig struct { + registering bool + aclEnabled bool + errOnRegister bool + errOnDeregister bool + errOnPolicyRead bool + errOnPolicyWrite bool + errOnPolicyDelete bool + errOnRoleUpdate bool + policyExists bool + temGWRoleMissing bool +} + func TestReconcile_Success(tt *testing.T) { deletionTime := metav1.Now() cases := map[string]struct { - registration *v1alpha1.Registration - expectedConditions []v1alpha1.Condition + registration *v1alpha1.Registration + terminatingGateways []runtime.Object + serverResponseConfig serverResponseConfig + expectedFinalizers []string + expectedConditions []v1alpha1.Condition }{ - "success on registration": { + "registering - success on registration": { + registration: &v1alpha1.Registration{ + TypeMeta: metav1.TypeMeta{ + Kind: "Registration", + APIVersion: "consul.hashicorp.com/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-registration", + Finalizers: []string{configentries.RegistrationFinalizer}, + }, + Spec: v1alpha1.RegistrationSpec{ + ID: "node-id", + Node: "virtual-node", + Address: "127.0.0.1", + Datacenter: "dc1", + Service: v1alpha1.Service{ + ID: "service-id", + Name: "service-name", + Port: 8080, + Address: "127.0.0.1", + }, + }, + }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{registering: true}, + expectedFinalizers: []string{configentries.RegistrationFinalizer}, + expectedConditions: []v1alpha1.Condition{{ + Type: "Synced", + Status: v1.ConditionTrue, + Reason: "", + Message: "", + }}, + }, + "registering -- ACLs enabled and policy does not exist": { + registration: &v1alpha1.Registration{ + TypeMeta: metav1.TypeMeta{ + Kind: "Registration", + APIVersion: "consul.hashicorp.com/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-registration", + Finalizers: []string{configentries.RegistrationFinalizer}, + }, + Spec: v1alpha1.RegistrationSpec{ + ID: "node-id", + Node: "virtual-node", + Address: "127.0.0.1", + Datacenter: "dc1", + Service: v1alpha1.Service{ + ID: "service-id", + Name: "service-name", + Port: 8080, + Address: "127.0.0.1", + }, + }, + }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + registering: true, + aclEnabled: true, + }, + expectedFinalizers: []string{configentries.RegistrationFinalizer}, + expectedConditions: []v1alpha1.Condition{{ + Type: "Synced", + Status: v1.ConditionTrue, + Reason: "", + Message: "", + }}, + }, + "registering -- ACLs enabled and policy does exists": { registration: &v1alpha1.Registration{ TypeMeta: metav1.TypeMeta{ Kind: "Registration", @@ -60,6 +174,26 @@ func TestReconcile_Success(tt *testing.T) { }, }, }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + registering: true, + aclEnabled: true, + policyExists: true, + }, + expectedFinalizers: []string{configentries.RegistrationFinalizer}, expectedConditions: []v1alpha1.Condition{{ Type: "Synced", Status: v1.ConditionTrue, @@ -67,7 +201,7 @@ func TestReconcile_Success(tt *testing.T) { Message: "", }}, }, - "success on deregistration": { + "deregistering": { registration: &v1alpha1.Registration{ TypeMeta: metav1.TypeMeta{ Kind: "Registration", @@ -91,6 +225,68 @@ func TestReconcile_Success(tt *testing.T) { }, }, }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + registering: false, + aclEnabled: false, + }, + expectedConditions: []v1alpha1.Condition{}, + }, + "deregistering - ACLs enabled": { + registration: &v1alpha1.Registration{ + TypeMeta: metav1.TypeMeta{ + Kind: "Registration", + APIVersion: "consul.hashicorp.com/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-registration", + Finalizers: []string{configentries.RegistrationFinalizer}, + DeletionTimestamp: &deletionTime, + }, + Spec: v1alpha1.RegistrationSpec{ + ID: "node-id", + Node: "virtual-node", + Address: "127.0.0.1", + Datacenter: "dc1", + Service: v1alpha1.Service{ + ID: "service-id", + Name: "service-name", + Port: 8080, + Address: "127.0.0.1", + }, + }, + }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + registering: false, + aclEnabled: true, + }, expectedConditions: []v1alpha1.Condition{}, }, } @@ -100,29 +296,17 @@ func TestReconcile_Success(tt *testing.T) { tt.Run(name, func(t *testing.T) { t.Parallel() s := runtime.NewScheme() - s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.Registration{}) + s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.Registration{}, &v1alpha1.TerminatingGateway{}, &v1alpha1.TerminatingGatewayList{}) ctx := context.Background() - consulServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - })) + consulServer, testClient := fakeConsulServer(t, tc.serverResponseConfig, tc.registration.Spec.Service.Name) defer consulServer.Close() - parsedURL, err := url.Parse(consulServer.URL) - require.NoError(t, err) - host := strings.Split(parsedURL.Host, ":")[0] - - port, err := strconv.Atoi(parsedURL.Port()) - require.NoError(t, err) - - testClient := &test.TestServerClient{ - Cfg: &consul.Config{APIClientConfig: &capi.Config{Address: host}, HTTPPort: port}, - Watcher: test.MockConnMgrForIPAndPort(t, host, port, false), - } - + runtimeObjs := []runtime.Object{tc.registration} + runtimeObjs = append(runtimeObjs, tc.terminatingGateways...) fakeClient := fake.NewClientBuilder(). WithScheme(s). - WithRuntimeObjects(tc.registration). + WithRuntimeObjects(runtimeObjs...). WithStatusSubresource(&v1alpha1.Registration{}). Build() @@ -134,7 +318,7 @@ func TestReconcile_Success(tt *testing.T) { ConsulServerConnMgr: testClient.Watcher, } - _, err = controller.Reconcile(ctx, ctrl.Request{ + _, err := controller.Reconcile(ctx, ctrl.Request{ NamespacedName: types.NamespacedName{Name: tc.registration.Name, Namespace: tc.registration.Namespace}, }) require.NoError(t, err) @@ -149,6 +333,8 @@ func TestReconcile_Success(tt *testing.T) { t.Errorf("unexpected condition diff: %s", diff) } } + + require.ElementsMatch(t, fetchedReg.Finalizers, tc.expectedFinalizers) }) } } @@ -156,10 +342,12 @@ func TestReconcile_Success(tt *testing.T) { func TestReconcile_Failure(tt *testing.T) { deletionTime := metav1.Now() cases := map[string]struct { - registration *v1alpha1.Registration - expectedConditions []v1alpha1.Condition + registration *v1alpha1.Registration + terminatingGateways []runtime.Object + serverResponseConfig serverResponseConfig + expectedConditions []v1alpha1.Condition }{ - "failure on registration": { + "registering - registration call to consul fails": { registration: &v1alpha1.Registration{ TypeMeta: metav1.TypeMeta{ Kind: "Registration", @@ -182,14 +370,326 @@ func TestReconcile_Failure(tt *testing.T) { }, }, }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + registering: true, + errOnRegister: true, + }, expectedConditions: []v1alpha1.Condition{{ Type: "Synced", Status: v1.ConditionFalse, - Reason: "ConsulErrorRegistration", + Reason: configentries.ConsulErrorRegistration, Message: "", }}, }, - "failure on deregistration": { + "registering - terminating gateway acl role not found": { + registration: &v1alpha1.Registration{ + TypeMeta: metav1.TypeMeta{ + Kind: "Registration", + APIVersion: "consul.hashicorp.com/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-registration", + Finalizers: []string{configentries.RegistrationFinalizer}, + }, + Spec: v1alpha1.RegistrationSpec{ + ID: "node-id", + Node: "virtual-node", + Address: "127.0.0.1", + Datacenter: "dc1", + Service: v1alpha1.Service{ + ID: "service-id", + Name: "service-name", + Port: 8080, + Address: "127.0.0.1", + }, + }, + }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + registering: true, + aclEnabled: true, + temGWRoleMissing: true, + }, + expectedConditions: []v1alpha1.Condition{{ + Type: "Synced", + Status: v1.ConditionFalse, + Reason: configentries.ConsulErrorACL, + Message: "", + }}, + }, + "registering - error reading policy": { + registration: &v1alpha1.Registration{ + TypeMeta: metav1.TypeMeta{ + Kind: "Registration", + APIVersion: "consul.hashicorp.com/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-registration", + Finalizers: []string{configentries.RegistrationFinalizer}, + }, + Spec: v1alpha1.RegistrationSpec{ + ID: "node-id", + Node: "virtual-node", + Address: "127.0.0.1", + Datacenter: "dc1", + Service: v1alpha1.Service{ + ID: "service-id", + Name: "service-name", + Port: 8080, + Address: "127.0.0.1", + }, + }, + }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + registering: true, + aclEnabled: true, + errOnPolicyRead: true, + policyExists: true, + }, + expectedConditions: []v1alpha1.Condition{{ + Type: "Synced", + Status: v1.ConditionFalse, + Reason: configentries.ConsulErrorACL, + Message: "", + }}, + }, + "registering - policy does not exist - error creating policy": { + registration: &v1alpha1.Registration{ + TypeMeta: metav1.TypeMeta{ + Kind: "Registration", + APIVersion: "consul.hashicorp.com/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-registration", + Finalizers: []string{configentries.RegistrationFinalizer}, + }, + Spec: v1alpha1.RegistrationSpec{ + ID: "node-id", + Node: "virtual-node", + Address: "127.0.0.1", + Datacenter: "dc1", + Service: v1alpha1.Service{ + ID: "service-id", + Name: "service-name", + Port: 8080, + Address: "127.0.0.1", + }, + }, + }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + registering: true, + aclEnabled: true, + errOnPolicyWrite: true, + }, + expectedConditions: []v1alpha1.Condition{{ + Type: "Synced", + Status: v1.ConditionFalse, + Reason: configentries.ConsulErrorACL, + Message: "", + }}, + }, + "registering - error updating role": { + registration: &v1alpha1.Registration{ + TypeMeta: metav1.TypeMeta{ + Kind: "Registration", + APIVersion: "consul.hashicorp.com/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-registration", + Finalizers: []string{configentries.RegistrationFinalizer}, + }, + Spec: v1alpha1.RegistrationSpec{ + ID: "node-id", + Node: "virtual-node", + Address: "127.0.0.1", + Datacenter: "dc1", + Service: v1alpha1.Service{ + ID: "service-id", + Name: "service-name", + Port: 8080, + Address: "127.0.0.1", + }, + }, + }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + registering: true, + aclEnabled: true, + errOnRoleUpdate: true, + }, + expectedConditions: []v1alpha1.Condition{{ + Type: "Synced", + Status: v1.ConditionFalse, + Reason: configentries.ConsulErrorACL, + Message: "", + }}, + }, + "deregistering": { + registration: &v1alpha1.Registration{ + TypeMeta: metav1.TypeMeta{ + Kind: "Registration", + APIVersion: "consul.hashicorp.com/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-registration", + Finalizers: []string{configentries.RegistrationFinalizer}, + DeletionTimestamp: &deletionTime, + }, + Spec: v1alpha1.RegistrationSpec{ + ID: "node-id", + Node: "virtual-node", + Address: "127.0.0.1", + Datacenter: "dc1", + Service: v1alpha1.Service{ + ID: "service-id", + Name: "service-name", + Port: 8080, + Address: "127.0.0.1", + }, + }, + }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + errOnDeregister: true, + }, + expectedConditions: []v1alpha1.Condition{{ + Type: "Synced", + Status: v1.ConditionFalse, + Reason: configentries.ConsulErrorDeregistration, + Message: "", + }}, + }, + "deregistering - ACLs enabled - terminating-gateway error updating role": { + registration: &v1alpha1.Registration{ + TypeMeta: metav1.TypeMeta{ + Kind: "Registration", + APIVersion: "consul.hashicorp.com/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-registration", + Finalizers: []string{configentries.RegistrationFinalizer}, + DeletionTimestamp: &deletionTime, + }, + Spec: v1alpha1.RegistrationSpec{ + ID: "node-id", + Node: "virtual-node", + Address: "127.0.0.1", + Datacenter: "dc1", + Service: v1alpha1.Service{ + ID: "service-id", + Name: "service-name", + Port: 8080, + Address: "127.0.0.1", + }, + }, + }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + aclEnabled: true, + errOnRoleUpdate: true, + }, + expectedConditions: []v1alpha1.Condition{{ + Type: "Synced", + Status: v1.ConditionFalse, + Reason: configentries.ConsulErrorACL, + Message: "", + }}, + }, + "deregistering - ACLs enabled - terminating-gateway error deleting policy": { registration: &v1alpha1.Registration{ TypeMeta: metav1.TypeMeta{ Kind: "Registration", @@ -213,10 +713,28 @@ func TestReconcile_Failure(tt *testing.T) { }, }, }, + terminatingGateways: []runtime.Object{ + &v1alpha1.TerminatingGateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "terminating-gateway", + }, + Spec: v1alpha1.TerminatingGatewaySpec{ + Services: []v1alpha1.LinkedService{ + { + Name: "service-name", + }, + }, + }, + }, + }, + serverResponseConfig: serverResponseConfig{ + aclEnabled: true, + errOnPolicyDelete: true, + }, expectedConditions: []v1alpha1.Condition{{ Type: "Synced", Status: v1.ConditionFalse, - Reason: "ConsulErrorDeregistration", + Reason: configentries.ConsulErrorACL, Message: "", }}, }, @@ -227,29 +745,17 @@ func TestReconcile_Failure(tt *testing.T) { tt.Run(name, func(t *testing.T) { t.Parallel() s := runtime.NewScheme() - s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.Registration{}) + s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.Registration{}, &v1alpha1.TerminatingGateway{}, &v1alpha1.TerminatingGatewayList{}) ctx := context.Background() - consulServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(500) - })) + consulServer, testClient := fakeConsulServer(t, tc.serverResponseConfig, tc.registration.Spec.Service.Name) defer consulServer.Close() - parsedURL, err := url.Parse(consulServer.URL) - require.NoError(t, err) - host := strings.Split(parsedURL.Host, ":")[0] - - port, err := strconv.Atoi(parsedURL.Port()) - require.NoError(t, err) - - testClient := &test.TestServerClient{ - Cfg: &consul.Config{APIClientConfig: &capi.Config{Address: host}, HTTPPort: port}, - Watcher: test.MockConnMgrForIPAndPort(t, host, port, false), - } - + runtimeObjs := []runtime.Object{tc.registration} + runtimeObjs = append(runtimeObjs, tc.terminatingGateways...) fakeClient := fake.NewClientBuilder(). WithScheme(s). - WithRuntimeObjects(tc.registration). + WithRuntimeObjects(runtimeObjs...). WithStatusSubresource(&v1alpha1.Registration{}). Build() @@ -261,7 +767,7 @@ func TestReconcile_Failure(tt *testing.T) { ConsulServerConnMgr: testClient.Watcher, } - _, err = controller.Reconcile(ctx, ctrl.Request{ + _, err := controller.Reconcile(ctx, ctrl.Request{ NamespacedName: types.NamespacedName{Name: tc.registration.Name, Namespace: tc.registration.Namespace}, }) require.Error(t, err) @@ -276,6 +782,200 @@ func TestReconcile_Failure(tt *testing.T) { t.Errorf("unexpected condition diff: %s", diff) } } + + require.ElementsMatch(t, fetchedReg.Finalizers, []string{configentries.RegistrationFinalizer}) }) } } + +func fakeConsulServer(t *testing.T, serverResponseConfig serverResponseConfig, serviceName string) (*httptest.Server, *test.TestServerClient) { + t.Helper() + mux := buildMux(t, serverResponseConfig, serviceName) + consulServer := httptest.NewServer(mux) + + parsedURL, err := url.Parse(consulServer.URL) + require.NoError(t, err) + host := strings.Split(parsedURL.Host, ":")[0] + + port, err := strconv.Atoi(parsedURL.Port()) + require.NoError(t, err) + + cfg := &consul.Config{APIClientConfig: &capi.Config{Address: host}, HTTPPort: port} + if serverResponseConfig.aclEnabled { + cfg.APIClientConfig.Token = "test-token" + } + + testClient := &test.TestServerClient{ + Cfg: cfg, + Watcher: test.MockConnMgrForIPAndPort(t, host, port, false), + } + + return consulServer, testClient +} + +func buildMux(t *testing.T, cfg serverResponseConfig, serviceName string) http.Handler { + t.Helper() + mux := http.NewServeMux() + mux.HandleFunc("/v1/catalog/register", func(w http.ResponseWriter, r *http.Request) { + if cfg.errOnRegister { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + }) + + mux.HandleFunc("/v1/catalog/deregister", func(w http.ResponseWriter, r *http.Request) { + if cfg.errOnDeregister { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + }) + + policyID, err := uuid.GenerateUUID() + require.NoError(t, err) + + mux.HandleFunc("/v1/acl/roles", func(w http.ResponseWriter, r *http.Request) { + entries := []*capi.ACLRole{ + { + ID: "754a8717-46e9-9f18-7f76-28dc0afafd19", + Name: "consul-consul-connect-inject-acl-role", + Description: "ACL Role for consul-consul-connect-injector", + Policies: []*capi.ACLLink{ + { + ID: "38511a9f-a309-11e2-7f67-7fea12056e7c", + Name: "connect-inject-policy", + }, + }, + }, + } + + if cfg.temGWRoleMissing { + val, err := json.Marshal(entries) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Write(val) + return + } + + termGWPolicies := []*capi.ACLLink{ + { + ID: "b7e377d9-5e2b-b99c-3f06-139584cf47f8", + Name: "terminating-gateway-policy", + }, + } + + if !cfg.registering { + termGWPolicies = append(termGWPolicies, &capi.ACLLink{ + ID: policyID, + Name: fmt.Sprintf("%s-write-policy", serviceName), + }) + } + + termGWRole := &capi.ACLRole{ + ID: "61fc5051-96e9-7b67-69b5-98f7f6682563", + Name: "consul-consul-terminating-gateway-acl-role", + Description: "ACL Role for consul-consul-terminating-gateway", + Policies: termGWPolicies, + } + + entries = append(entries, termGWRole) + + val, err := json.Marshal(entries) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Write(val) + }) + + mux.HandleFunc("/v1/acl/role/", func(w http.ResponseWriter, r *http.Request) { + if cfg.errOnRoleUpdate { + w.WriteHeader(500) + return + } + + role := &capi.ACLRole{ + ID: "61fc5051-96e9-7b67-69b5-98f7f6682563", + Name: "consul-consul-terminating-gateway-acl-role", + Description: "ACL Role for consul-consul-terminating-gateway", + Policies: []*capi.ACLLink{ + { + ID: "b7e377d9-5e2b-b99c-3f06-139584cf47f8", + Name: "terminating-gateway-policy", + }, + { + ID: policyID, + Name: fmt.Sprintf("%s-write-policy", serviceName), + }, + }, + } + val, err := json.Marshal(role) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Write(val) + }) + + mux.HandleFunc("/v1/acl/policy/name/", func(w http.ResponseWriter, r *http.Request) { + if cfg.errOnPolicyRead { + w.WriteHeader(500) + return + } + + if !cfg.policyExists { + w.WriteHeader(404) + return + } + + policy := &capi.ACLPolicy{ + ID: policyID, + Name: fmt.Sprintf("%s-write-policy", serviceName), + } + + val, err := json.Marshal(policy) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Write(val) + }) + + mux.HandleFunc("/v1/acl/policy/", func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + if cfg.errOnPolicyWrite { + w.WriteHeader(500) + return + } + + policy := &capi.ACLPolicy{ + ID: policyID, + Name: fmt.Sprintf("%s-write-policy", serviceName), + } + + val, err := json.Marshal(policy) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Write(val) + case "DELETE": + if cfg.errOnPolicyDelete { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + } + }) + + return mux +} diff --git a/control-plane/subcommand/inject-connect/v1controllers.go b/control-plane/subcommand/inject-connect/v1controllers.go index 4b1de1e325..df7fff50c1 100644 --- a/control-plane/subcommand/inject-connect/v1controllers.go +++ b/control-plane/subcommand/inject-connect/v1controllers.go @@ -290,7 +290,7 @@ func (c *Command) configureV1Controllers(ctx context.Context, mgr manager.Manage ConsulServerConnMgr: watcher, Scheme: mgr.GetScheme(), Log: ctrl.Log.WithName("controller").WithName(apicommon.Registration), - }).SetupWithManager(mgr); err != nil { + }).SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", apicommon.Registration) return err }