diff --git a/control-plane/api/v1alpha1/registration_types.go b/control-plane/api/v1alpha1/registration_types.go index 739f85e87d..f3190aef87 100644 --- a/control-plane/api/v1alpha1/registration_types.go +++ b/control-plane/api/v1alpha1/registration_types.go @@ -11,9 +11,7 @@ import ( capi "github.com/hashicorp/consul/api" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" ) func init() { @@ -290,24 +288,286 @@ func (r *Registration) ToCatalogDeregistration() *capi.CatalogDeregistration { } } -func (r *Registration) NamespacedName() types.NamespacedName { - return types.NamespacedName{ - Namespace: r.Namespace, - Name: r.Name, +func (r *Registration) EqualExceptStatus(other *Registration) bool { + if r == nil || other == nil { + return false } + + if r.Spec.ID != other.Spec.ID { + return false + } + + if r.Spec.Node != other.Spec.Node { + return false + } + + if r.Spec.Address != other.Spec.Address { + return false + } + + if !maps.Equal(r.Spec.TaggedAddresses, other.Spec.TaggedAddresses) { + return false + } + + if !maps.Equal(r.Spec.NodeMeta, other.Spec.NodeMeta) { + return false + } + + if r.Spec.Datacenter != other.Spec.Datacenter { + return false + } + + if !r.Spec.Service.Equal(&other.Spec.Service) { + return false + } + + if r.Spec.SkipNodeUpdate != other.Spec.SkipNodeUpdate { + return false + } + + if r.Spec.Partition != other.Spec.Partition { + return false + } + + if !r.Spec.HealthCheck.Equal(other.Spec.HealthCheck) { + return false + } + + if !r.Spec.Locality.Equal(other.Spec.Locality) { + return false + } + + return true } -// SetSyncedCondition sets the synced condition on the Registration. -func (r *Registration) SetSyncedCondition(status corev1.ConditionStatus, reason string, message string) { - r.Status.Conditions = Conditions{ - { - Type: ConditionSynced, - Status: status, - LastTransitionTime: metav1.Now(), - Reason: reason, - Message: message, - }, +func (s *Service) Equal(other *Service) bool { + if s == nil && other == nil { + return true + } + + if s == nil || other == nil { + return false + } + + if s.ID != other.ID { + return false + } + + if s.Name != other.Name { + return false + } + + if !slices.Equal(s.Tags, other.Tags) { + return false + } + + if !maps.Equal(s.Meta, other.Meta) { + return false + } + + if s.Port != other.Port { + return false + } + + if s.Address != other.Address { + return false + } + + if s.SocketPath != other.SocketPath { + return false + } + + if !maps.Equal(s.TaggedAddresses, other.TaggedAddresses) { + return false + } + + if !s.Weights.Equal(other.Weights) { + return false + } + + if s.EnableTagOverride != other.EnableTagOverride { + return false + } + + if s.Namespace != other.Namespace { + return false + } + + if s.Partition != other.Partition { + return false } + + if !s.Locality.Equal(other.Locality) { + return false + } + return true +} + +func (l *Locality) Equal(other *Locality) bool { + if l == nil && other == nil { + return true + } + + if l == nil || other == nil { + return false + } + if l.Region != other.Region { + return false + } + if l.Zone != other.Zone { + return false + } + return true +} + +func (w Weights) Equal(other Weights) bool { + if w.Passing != other.Passing { + return false + } + + if w.Warning != other.Warning { + return false + } + return true +} + +func (h *HealthCheck) Equal(other *HealthCheck) bool { + if h == nil && other == nil { + return true + } + + if h == nil || other == nil { + return false + } + + if h.Node != other.Node { + return false + } + + if h.CheckID != other.CheckID { + return false + } + + if h.Name != other.Name { + return false + } + + if h.Status != other.Status { + return false + } + + if h.Notes != other.Notes { + return false + } + + if h.Output != other.Output { + return false + } + + if h.ServiceID != other.ServiceID { + return false + } + + if h.ServiceName != other.ServiceName { + return false + } + + if h.Type != other.Type { + return false + } + + if h.ExposedPort != other.ExposedPort { + return false + } + + if h.Namespace != other.Namespace { + return false + } + + if h.Partition != other.Partition { + return false + } + + if !h.Definition.Equal(other.Definition) { + return false + } + + return true +} + +func (h HealthCheckDefinition) Equal(other HealthCheckDefinition) bool { + if h.HTTP != other.HTTP { + return false + } + + if len(h.Header) != len(other.Header) { + return false + } + + for k, v := range h.Header { + otherValues, ok := other.Header[k] + if !ok { + return false + } + + if !slices.Equal(v, otherValues) { + return false + } + } + + if h.Method != other.Method { + return false + } + + if h.Body != other.Body { + return false + } + + if h.TLSServerName != other.TLSServerName { + return false + } + + if h.TLSSkipVerify != other.TLSSkipVerify { + return false + } + + if h.TCP != other.TCP { + return false + } + + if h.TCPUseTLS != other.TCPUseTLS { + return false + } + + if h.UDP != other.UDP { + return false + } + + if h.GRPC != other.GRPC { + return false + } + + if h.OSService != other.OSService { + return false + } + + if h.GRPCUseTLS != other.GRPCUseTLS { + return false + } + + if h.IntervalDuration != other.IntervalDuration { + return false + } + + if h.TimeoutDuration != other.TimeoutDuration { + return false + } + + if h.DeregisterCriticalServiceAfterDuration != other.DeregisterCriticalServiceAfterDuration { + return false + } + + return true } func (r *Registration) KubernetesName() string { diff --git a/control-plane/catalog/registration/cache.go b/control-plane/catalog/registration/cache.go new file mode 100644 index 0000000000..74606660e3 --- /dev/null +++ b/control-plane/catalog/registration/cache.go @@ -0,0 +1,296 @@ +package registration + +import ( + "context" + "errors" + "fmt" + "slices" + "strings" + "sync" + + mapset "github.com/deckarep/golang-set/v2" + "github.com/go-logr/logr" + "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" + "github.com/hashicorp/consul-k8s/control-plane/consul" + capi "github.com/hashicorp/consul/api" +) + +const NotInServiceMeshFilter = "ServiceMeta[\"managed-by\"] != \"consul-k8s-endpoints-controller\"" + +type RegistrationCache struct { + ConsulClientConfig *consul.Config + ConsulServerConnMgr consul.ServerConnectionManager + serviceMtx *sync.Mutex + Services map[string]*v1alpha1.Registration + synced chan struct{} + UpdateChan chan string +} + +func NewRegistrationCache(consulClientConfig *consul.Config, consulServerConnMgr consul.ServerConnectionManager) *RegistrationCache { + return &RegistrationCache{ + ConsulClientConfig: consulClientConfig, + ConsulServerConnMgr: consulServerConnMgr, + serviceMtx: &sync.Mutex{}, + Services: make(map[string]*v1alpha1.Registration), + UpdateChan: make(chan string), + synced: make(chan struct{}), + } +} + +// waitSynced is used to coordinate with the caller when the cache is initially filled. +func (c *RegistrationCache) waitSynced(ctx context.Context) { + select { + case <-c.synced: + return + case <-ctx.Done(): + return + } +} + +func (c *RegistrationCache) run(ctx context.Context, log logr.Logger) { + once := &sync.Once{} + opts := &capi.QueryOptions{Filter: NotInServiceMeshFilter} + + for { + select { + case <-ctx.Done(): + return + default: + + client, err := consul.NewClientFromConnMgr(c.ConsulClientConfig, c.ConsulServerConnMgr) + if err != nil { + log.Error(err, "error initializing consul client") + continue + } + entries, meta, err := client.Catalog().Services(opts.WithContext(ctx)) + if err != nil { + // if we timeout we don't care about the error message because it's expected to happen on long polls + // any other error we want to alert on + if !strings.Contains(strings.ToLower(err.Error()), "timeout") && + !strings.Contains(strings.ToLower(err.Error()), "no such host") && + !strings.Contains(strings.ToLower(err.Error()), "connection refused") { + log.Error(err, "error fetching registrations") + } + continue + } + + diffs := mapset.NewSet[string]() + c.serviceMtx.Lock() + for svc := range c.Services { + if _, ok := entries[svc]; !ok { + diffs.Add(svc) + } + } + c.serviceMtx.Unlock() + + for _, svc := range diffs.ToSlice() { + log.Info("consul deregistered service", "svcName", svc) + c.UpdateChan <- svc + } + + opts.WaitIndex = meta.LastIndex + once.Do(func() { + log.Info("Initial sync complete") + c.synced <- struct{}{} + }) + } + } +} + +func (c *RegistrationCache) get(svcName string) (*v1alpha1.Registration, bool) { + c.serviceMtx.Lock() + defer c.serviceMtx.Unlock() + val, ok := c.Services[svcName] + return val, ok +} + +func (c *RegistrationCache) aclsEnabled() bool { + return c.ConsulClientConfig.APIClientConfig.Token != "" || c.ConsulClientConfig.APIClientConfig.TokenFile != "" +} + +func (c *RegistrationCache) registerService(log logr.Logger, reg *v1alpha1.Registration) error { + client, err := consul.NewClientFromConnMgr(c.ConsulClientConfig, c.ConsulServerConnMgr) + if err != nil { + return err + } + + regReq, err := reg.ToCatalogRegistration() + if err != nil { + return err + } + + _, err = client.Catalog().Register(regReq, nil) + if err != nil { + log.Error(err, "error registering service", "svcName", regReq.Service.Service) + return err + } + + c.serviceMtx.Lock() + defer c.serviceMtx.Unlock() + c.Services[reg.Spec.Service.Name] = reg + + log.Info("Successfully registered service", "svcName", regReq.Service.Service) + + return nil +} + +func (c *RegistrationCache) updateTermGWACLRole(log logr.Logger, registration *v1alpha1.Registration, termGWsToUpdate []v1alpha1.TerminatingGateway) error { + if len(termGWsToUpdate) == 0 { + log.Info("terminating gateway not found") + return nil + } + + client, err := consul.NewClientFromConnMgr(c.ConsulClientConfig, c.ConsulServerConnMgr) + if err != nil { + return err + } + + roles, _, err := client.ACL().RoleList(nil) + if err != nil { + log.Error(err, "error reading role list") + return err + } + + policy := &capi.ACLPolicy{ + Name: servicePolicyName(registration.Spec.Service.Name), + Description: "Write policy for terminating gateways for external service", + Rules: fmt.Sprintf(`service %q { policy = "write" }`, registration.Spec.Service.Name), + Datacenters: []string{registration.Spec.Datacenter}, + Namespace: registration.Spec.Service.Namespace, + Partition: registration.Spec.Service.Partition, + } + + existingPolicy, _, err := client.ACL().PolicyReadByName(policy.Name, nil) + if err != nil { + log.Error(err, "error reading policy") + return err + } + + if existingPolicy == nil { + policy, _, err = client.ACL().PolicyCreate(policy, nil) + if err != nil { + return fmt.Errorf("error creating policy: %w", err) + } + } else { + policy = existingPolicy + } + + var mErr error + for _, termGW := range termGWsToUpdate { + var role *capi.ACLRole + for _, r := range roles { + if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) { + role = r + break + } + } + + if role == nil { + log.Info("terminating gateway role not found", "terminatingGatewayName", termGW.Name) + mErr = errors.Join(mErr, fmt.Errorf("terminating gateway role not found for %q", termGW.Name)) + continue + } + + role.Policies = append(role.Policies, &capi.ACLRolePolicyLink{Name: policy.Name, ID: policy.ID}) + + _, _, err = client.ACL().RoleUpdate(role, nil) + if err != nil { + log.Error(err, "error updating role", "roleName", role.Name) + mErr = errors.Join(mErr, fmt.Errorf("error updating role %q", role.Name)) + continue + } + } + + return mErr +} + +func (c *RegistrationCache) deregisterService(log logr.Logger, reg *v1alpha1.Registration) error { + client, err := consul.NewClientFromConnMgr(c.ConsulClientConfig, c.ConsulServerConnMgr) + if err != nil { + return err + } + + deRegReq := reg.ToCatalogDeregistration() + _, err = client.Catalog().Deregister(deRegReq, nil) + if err != nil { + log.Error(err, "error deregistering service", "svcID", deRegReq.ServiceID) + return err + } + + c.serviceMtx.Lock() + defer c.serviceMtx.Unlock() + delete(c.Services, reg.Spec.Service.Name) + + log.Info("Successfully deregistered service", "svcID", deRegReq.ServiceID) + return nil +} + +func (c *RegistrationCache) removeTermGWACLRole(log logr.Logger, registration *v1alpha1.Registration, termGWsToUpdate []v1alpha1.TerminatingGateway) error { + if len(termGWsToUpdate) == 0 { + log.Info("terminating gateway not found") + return nil + } + + client, err := consul.NewClientFromConnMgr(c.ConsulClientConfig, c.ConsulServerConnMgr) + if err != nil { + return err + } + + roles, _, err := client.ACL().RoleList(nil) + if err != nil { + return err + } + + var mErr error + for _, termGW := range termGWsToUpdate { + var role *capi.ACLRole + for _, r := range roles { + if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) { + role = r + break + } + } + + if role == nil { + log.Info("terminating gateway role not found", "terminatingGatewayName", termGW.Name) + mErr = errors.Join(mErr, fmt.Errorf("terminating gateway role not found for %q", termGW.Name)) + continue + } + + var policyID string + + expectedPolicyName := servicePolicyName(registration.Spec.Service.Name) + role.Policies = slices.DeleteFunc(role.Policies, func(i *capi.ACLRolePolicyLink) bool { + if i.Name == expectedPolicyName { + policyID = i.ID + return true + } + return false + }) + + if policyID == "" { + log.Info("policy not found on terminating gateway role", "policyName", expectedPolicyName, "terminatingGatewayName", termGW.Name) + continue + } + + _, _, err = client.ACL().RoleUpdate(role, nil) + if err != nil { + log.Error(err, "error updating role", "roleName", role.Name) + mErr = errors.Join(mErr, fmt.Errorf("error updating role %q", role.Name)) + continue + } + + _, err = client.ACL().PolicyDelete(policyID, nil) + if err != nil { + log.Error(err, "error deleting service policy", "policyID", policyID, "policyName", expectedPolicyName) + mErr = errors.Join(mErr, fmt.Errorf("error deleting service ACL policy %q", policyID)) + continue + } + } + + return mErr +} + +func servicePolicyName(name string) string { + return fmt.Sprintf("%s-write-policy", name) +} diff --git a/control-plane/catalog/registration/registrations_controller.go b/control-plane/catalog/registration/registrations_controller.go new file mode 100644 index 0000000000..eabcaae979 --- /dev/null +++ b/control-plane/catalog/registration/registrations_controller.go @@ -0,0 +1,297 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package registration + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/go-logr/logr" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" + "github.com/hashicorp/consul-k8s/control-plane/controllers/configentries" +) + +const ( + RegistrationFinalizer = "registration.finalizers.consul.hashicorp.com" + registrationByServiceNameIndex = "registrationName" +) + +var ( + ErrRegisteringService = fmt.Errorf("error registering service") + ErrDeregisteringService = fmt.Errorf("error deregistering service") + ErrUpdatingACLRoles = fmt.Errorf("error updating ACL roles") + ErrRemovingACLRoles = fmt.Errorf("error removing ACL roles") +) + +// RegistrationsController is the controller for Registrations resources. +type RegistrationsController struct { + client.Client + configentries.FinalizerPatcher + Scheme *runtime.Scheme + Cache *RegistrationCache + Log logr.Logger +} + +// +kubebuilder:rbac:groups=consul.hashicorp.com,resources=servicerouters,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=consul.hashicorp.com,resources=servicerouters/status,verbs=get;update;patch + +func (r *RegistrationsController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.Log.V(1).WithValues("registration", req.NamespacedName) + log.Info("Reconciling Registaration") + + registration := &v1alpha1.Registration{} + // get the registration + if err := r.Client.Get(ctx, req.NamespacedName, registration); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "unable to get registration") + } + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + cachedRegistration, ok := r.Cache.get(registration.Spec.Service.Name) + if slices.ContainsFunc(registration.Status.Conditions, func(c v1alpha1.Condition) bool { return c.Type == ConditionDeregistered }) { + if ok && registration.EqualExceptStatus(cachedRegistration) { + log.Info("Registration is in sync") + // registration is already in sync so we do nothing, this happens when consul deregisters a service + // and we update the status to show that consul deregistered it + return ctrl.Result{}, nil + } + } + + log.Info("need to reconcile") + + // deletion request + if !registration.ObjectMeta.DeletionTimestamp.IsZero() { + result := r.handleDeletion(ctx, log, registration) + + if result.hasErrors() { + err := r.UpdateStatus(ctx, log, registration, result) + if err != nil { + log.Error(err, "failed to update Registration status", "name", registration.Name, "namespace", registration.Namespace) + } + return ctrl.Result{}, result.errors() + } + return ctrl.Result{}, nil + } + + // registration request + result := r.handleRegistration(ctx, log, registration) + err := r.UpdateStatus(ctx, log, registration, result) + if err != nil { + log.Error(err, "failed to update Registration status", "name", registration.Name, "namespace", registration.Namespace) + } + if result.hasErrors() { + return ctrl.Result{}, result.errors() + } + + return ctrl.Result{}, nil +} + +func (c *RegistrationsController) watchForDeregistrations(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case svc := <-c.Cache.UpdateChan: + // get all registrations for the service + regList := &v1alpha1.RegistrationList{} + err := c.Client.List(context.Background(), regList, client.MatchingFields{registrationByServiceNameIndex: svc}) + if err != nil { + c.Log.Error(err, "error listing registrations by service name", "serviceName", svc) + continue + } + for _, reg := range regList.Items { + + err := c.UpdateStatus(context.Background(), c.Log, ®, Result{Registering: false, ConsulDeregistered: true}) + if err != nil { + c.Log.Error(err, "failed to update Registration status", "name", reg.Name, "namespace", reg.Namespace) + } + } + } + } +} + +func (r *RegistrationsController) handleRegistration(ctx context.Context, log logr.Logger, registration *v1alpha1.Registration) Result { + log.Info("Registering service") + + result := Result{Registering: true} + + patch := r.AddFinalizersPatch(registration, RegistrationFinalizer) + err := r.Patch(ctx, registration, patch) + if err != nil { + err = fmt.Errorf("error adding finalizer: %w", err) + result.Finalizer = err + return result + } + + err = r.Cache.registerService(log, registration) + if err != nil { + result.Sync = err + result.Registration = fmt.Errorf("%w: %s", ErrRegisteringService, err) + return result + } + + if r.Cache.aclsEnabled() { + termGWsToUpdate, err := r.terminatingGatewaysToUpdate(ctx, log, registration) + if err != nil { + result.Sync = err + result.ACLUpdate = fmt.Errorf("%w: %s", ErrUpdatingACLRoles, err) + return result + } + + err = r.Cache.updateTermGWACLRole(log, registration, termGWsToUpdate) + if err != nil { + result.Sync = err + result.ACLUpdate = fmt.Errorf("%w: %s", ErrUpdatingACLRoles, err) + return result + } + } + return result +} + +func (r *RegistrationsController) terminatingGatewaysToUpdate(ctx context.Context, log logr.Logger, registration *v1alpha1.Registration) ([]v1alpha1.TerminatingGateway, error) { + termGWList := &v1alpha1.TerminatingGatewayList{} + err := r.Client.List(ctx, termGWList) + if err != nil { + log.Error(err, "error listing terminating gateways") + return nil, err + } + + termGWsToUpdate := make([]v1alpha1.TerminatingGateway, 0, len(termGWList.Items)) + for _, termGW := range termGWList.Items { + if slices.ContainsFunc(termGW.Spec.Services, termGWContainsService(registration)) { + termGWsToUpdate = append(termGWsToUpdate, termGW) + } + } + + return termGWsToUpdate, nil +} + +func termGWContainsService(registration *v1alpha1.Registration) func(v1alpha1.LinkedService) bool { + return func(svc v1alpha1.LinkedService) bool { + return svc.Name == registration.Spec.Service.Name + } +} + +func (r *RegistrationsController) handleDeletion(ctx context.Context, log logr.Logger, registration *v1alpha1.Registration) Result { + log.Info("Deregistering service") + result := Result{Registering: false} + err := r.Cache.deregisterService(log, registration) + if err != nil { + result.Sync = err + result.Deregistration = fmt.Errorf("%w: %s", ErrDeregisteringService, err) + return result + } + + if r.Cache.aclsEnabled() { + termGWsToUpdate, err := r.terminatingGatewaysToUpdate(ctx, log, registration) + if err != nil { + result.Sync = err + result.ACLUpdate = fmt.Errorf("%w: %s", ErrRemovingACLRoles, err) + return result + } + + err = r.Cache.removeTermGWACLRole(log, registration, termGWsToUpdate) + if err != nil { + result.Sync = err + result.ACLUpdate = fmt.Errorf("%w: %s", ErrRemovingACLRoles, err) + return result + } + } + + patch := r.RemoveFinalizersPatch(registration, RegistrationFinalizer) + err = r.Patch(ctx, registration, patch) + if err != nil { + result.Finalizer = err + return result + } + + return result +} + +func (r *RegistrationsController) UpdateStatus(ctx context.Context, log logr.Logger, registration *v1alpha1.Registration, result Result) error { + registration.Status.LastSyncedTime = &metav1.Time{Time: time.Now()} + registration.Status.Conditions = v1alpha1.Conditions{ + syncedCondition(result), + } + + if result.Registering { + registration.Status.Conditions = append(registration.Status.Conditions, registrationCondition(result)) + } else { + registration.Status.Conditions = append(registration.Status.Conditions, deregistrationCondition(result)) + } + + if r.Cache.aclsEnabled() { + registration.Status.Conditions = append(registration.Status.Conditions, aclCondition(result)) + } + + err := r.Status().Update(ctx, registration) + if err != nil { + return err + } + return nil +} + +func (r *RegistrationsController) Logger(name types.NamespacedName) logr.Logger { + return r.Log.WithValues("request", name) +} + +func (r *RegistrationsController) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { + // setup the cache + go r.Cache.run(ctx, r.Log) + r.Cache.waitSynced(ctx) + + go r.watchForDeregistrations(ctx) + + // setup the index to lookup registrations by service name + if err := mgr.GetFieldIndexer().IndexField(ctx, &v1alpha1.Registration{}, registrationByServiceNameIndex, indexerFn); err != nil { + return err + } + + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.Registration{}). + Watches(&v1alpha1.TerminatingGateway{}, handler.EnqueueRequestsFromMapFunc(r.transformTerminatingGateways)). + Complete(r) +} + +func indexerFn(o client.Object) []string { + reg := o.(*v1alpha1.Registration) + return []string{reg.Spec.Service.Name} +} + +func (r *RegistrationsController) transformTerminatingGateways(ctx context.Context, o client.Object) []reconcile.Request { + termGW := o.(*v1alpha1.TerminatingGateway) + reqs := make([]reconcile.Request, 0, len(termGW.Spec.Services)) + for _, svc := range termGW.Spec.Services { + // lookup registrationList by service name and add it to the reconcile request + registrationList := &v1alpha1.RegistrationList{} + + err := r.Client.List(ctx, registrationList, client.MatchingFields{registrationByServiceNameIndex: svc.Name}) + if err != nil { + r.Log.Error(err, "error listing registrations by service name", "serviceName", svc.Name) + continue + } + + for _, reg := range registrationList.Items { + reqs = append(reqs, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: reg.Name, + Namespace: reg.Namespace, + }, + }) + } + } + return reqs +} diff --git a/control-plane/controllers/configentries/registrations_controller_test.go b/control-plane/catalog/registration/registrations_controller_test.go similarity index 79% rename from control-plane/controllers/configentries/registrations_controller_test.go rename to control-plane/catalog/registration/registrations_controller_test.go index 805be4a014..ccbe053d06 100644 --- a/control-plane/controllers/configentries/registrations_controller_test.go +++ b/control-plane/catalog/registration/registrations_controller_test.go @@ -1,7 +1,7 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 -package configentries_test +package registration_test import ( "context" @@ -29,8 +29,8 @@ import ( "github.com/hashicorp/go-uuid" "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" + "github.com/hashicorp/consul-k8s/control-plane/catalog/registration" "github.com/hashicorp/consul-k8s/control-plane/consul" - "github.com/hashicorp/consul-k8s/control-plane/controllers/configentries" "github.com/hashicorp/consul-k8s/control-plane/helper/test" ) @@ -64,7 +64,7 @@ func TestReconcile_Success(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, }, Spec: v1alpha1.RegistrationSpec{ ID: "node-id", @@ -94,13 +94,21 @@ func TestReconcile_Success(tt *testing.T) { }, }, serverResponseConfig: serverResponseConfig{registering: true}, - expectedFinalizers: []string{configentries.RegistrationFinalizer}, - expectedConditions: []v1alpha1.Condition{{ - Type: "Synced", - Status: v1.ConditionTrue, - Reason: "", - Message: "", - }}, + expectedFinalizers: []string{registration.RegistrationFinalizer}, + expectedConditions: []v1alpha1.Condition{ + { + Type: v1alpha1.ConditionSynced, + Status: v1.ConditionTrue, + Reason: "", + Message: "", + }, + { + Type: registration.ConditionRegistered, + Status: v1.ConditionTrue, + Reason: "", + Message: "", + }, + }, }, "registering -- ACLs enabled and policy does not exist": { registration: &v1alpha1.Registration{ @@ -110,7 +118,7 @@ func TestReconcile_Success(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, }, Spec: v1alpha1.RegistrationSpec{ ID: "node-id", @@ -143,13 +151,27 @@ func TestReconcile_Success(tt *testing.T) { registering: true, aclEnabled: true, }, - expectedFinalizers: []string{configentries.RegistrationFinalizer}, - expectedConditions: []v1alpha1.Condition{{ - Type: "Synced", - Status: v1.ConditionTrue, - Reason: "", - Message: "", - }}, + expectedFinalizers: []string{registration.RegistrationFinalizer}, + expectedConditions: []v1alpha1.Condition{ + { + Type: v1alpha1.ConditionSynced, + Status: v1.ConditionTrue, + Reason: "", + Message: "", + }, + { + Type: registration.ConditionRegistered, + Status: v1.ConditionTrue, + Reason: "", + Message: "", + }, + { + Type: registration.ConditionACLsUpdated, + Status: v1.ConditionTrue, + Reason: "", + Message: "", + }, + }, }, "registering -- ACLs enabled and policy does exists": { registration: &v1alpha1.Registration{ @@ -159,7 +181,7 @@ func TestReconcile_Success(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, }, Spec: v1alpha1.RegistrationSpec{ ID: "node-id", @@ -193,13 +215,27 @@ func TestReconcile_Success(tt *testing.T) { aclEnabled: true, policyExists: true, }, - expectedFinalizers: []string{configentries.RegistrationFinalizer}, - expectedConditions: []v1alpha1.Condition{{ - Type: "Synced", - Status: v1.ConditionTrue, - Reason: "", - Message: "", - }}, + expectedFinalizers: []string{registration.RegistrationFinalizer}, + expectedConditions: []v1alpha1.Condition{ + { + Type: v1alpha1.ConditionSynced, + Status: v1.ConditionTrue, + Reason: "", + Message: "", + }, + { + Type: registration.ConditionRegistered, + Status: v1.ConditionTrue, + Reason: "", + Message: "", + }, + { + Type: registration.ConditionACLsUpdated, + Status: v1.ConditionTrue, + Reason: "", + Message: "", + }, + }, }, "deregistering": { registration: &v1alpha1.Registration{ @@ -209,7 +245,7 @@ func TestReconcile_Success(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, DeletionTimestamp: &deletionTime, }, Spec: v1alpha1.RegistrationSpec{ @@ -253,7 +289,7 @@ func TestReconcile_Success(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, DeletionTimestamp: &deletionTime, }, Spec: v1alpha1.RegistrationSpec{ @@ -310,12 +346,11 @@ func TestReconcile_Success(tt *testing.T) { WithStatusSubresource(&v1alpha1.Registration{}). Build() - controller := &configentries.RegistrationsController{ - Client: fakeClient, - Log: logrtest.NewTestLogger(t), - Scheme: s, - ConsulClientConfig: testClient.Cfg, - ConsulServerConnMgr: testClient.Watcher, + controller := ®istration.RegistrationsController{ + Client: fakeClient, + Log: logrtest.NewTestLogger(t), + Scheme: s, + Cache: registration.NewRegistrationCache(testClient.Cfg, testClient.Watcher), } _, err := controller.Reconcile(ctx, ctrl.Request{ @@ -355,7 +390,7 @@ func TestReconcile_Failure(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, }, Spec: v1alpha1.RegistrationSpec{ ID: "node-id", @@ -388,12 +423,18 @@ func TestReconcile_Failure(tt *testing.T) { registering: true, errOnRegister: true, }, - expectedConditions: []v1alpha1.Condition{{ - Type: "Synced", - Status: v1.ConditionFalse, - Reason: configentries.ConsulErrorRegistration, - Message: "", - }}, + expectedConditions: []v1alpha1.Condition{ + { + Type: v1alpha1.ConditionSynced, + Status: v1.ConditionFalse, + Reason: registration.SyncError, + }, + { + Type: registration.ConditionRegistered, + Status: v1.ConditionFalse, + Reason: registration.ConsulErrorRegistration, + }, + }, }, "registering - terminating gateway acl role not found": { registration: &v1alpha1.Registration{ @@ -403,7 +444,7 @@ func TestReconcile_Failure(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, }, Spec: v1alpha1.RegistrationSpec{ ID: "node-id", @@ -437,12 +478,22 @@ func TestReconcile_Failure(tt *testing.T) { aclEnabled: true, temGWRoleMissing: true, }, - expectedConditions: []v1alpha1.Condition{{ - Type: "Synced", - Status: v1.ConditionFalse, - Reason: configentries.ConsulErrorACL, - Message: "", - }}, + expectedConditions: []v1alpha1.Condition{ + { + Type: v1alpha1.ConditionSynced, + Status: v1.ConditionFalse, + Reason: registration.SyncError, + }, + { + Type: registration.ConditionRegistered, + Status: v1.ConditionTrue, + }, + { + Type: registration.ConditionACLsUpdated, + Status: v1.ConditionFalse, + Reason: registration.ConsulErrorACL, + }, + }, }, "registering - error reading policy": { registration: &v1alpha1.Registration{ @@ -452,7 +503,7 @@ func TestReconcile_Failure(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, }, Spec: v1alpha1.RegistrationSpec{ ID: "node-id", @@ -487,12 +538,22 @@ func TestReconcile_Failure(tt *testing.T) { errOnPolicyRead: true, policyExists: true, }, - expectedConditions: []v1alpha1.Condition{{ - Type: "Synced", - Status: v1.ConditionFalse, - Reason: configentries.ConsulErrorACL, - Message: "", - }}, + expectedConditions: []v1alpha1.Condition{ + { + Type: v1alpha1.ConditionSynced, + Status: v1.ConditionFalse, + Reason: registration.SyncError, + }, + { + Type: registration.ConditionRegistered, + Status: v1.ConditionTrue, + }, + { + Type: registration.ConditionACLsUpdated, + Status: v1.ConditionFalse, + Reason: registration.ConsulErrorACL, + }, + }, }, "registering - policy does not exist - error creating policy": { registration: &v1alpha1.Registration{ @@ -502,7 +563,7 @@ func TestReconcile_Failure(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, }, Spec: v1alpha1.RegistrationSpec{ ID: "node-id", @@ -536,12 +597,22 @@ func TestReconcile_Failure(tt *testing.T) { aclEnabled: true, errOnPolicyWrite: true, }, - expectedConditions: []v1alpha1.Condition{{ - Type: "Synced", - Status: v1.ConditionFalse, - Reason: configentries.ConsulErrorACL, - Message: "", - }}, + expectedConditions: []v1alpha1.Condition{ + { + Type: v1alpha1.ConditionSynced, + Status: v1.ConditionFalse, + Reason: registration.SyncError, + }, + { + Type: registration.ConditionRegistered, + Status: v1.ConditionTrue, + }, + { + Type: registration.ConditionACLsUpdated, + Status: v1.ConditionFalse, + Reason: registration.ConsulErrorACL, + }, + }, }, "registering - error updating role": { registration: &v1alpha1.Registration{ @@ -551,7 +622,7 @@ func TestReconcile_Failure(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, }, Spec: v1alpha1.RegistrationSpec{ ID: "node-id", @@ -585,12 +656,22 @@ func TestReconcile_Failure(tt *testing.T) { aclEnabled: true, errOnRoleUpdate: true, }, - expectedConditions: []v1alpha1.Condition{{ - Type: "Synced", - Status: v1.ConditionFalse, - Reason: configentries.ConsulErrorACL, - Message: "", - }}, + expectedConditions: []v1alpha1.Condition{ + { + Type: v1alpha1.ConditionSynced, + Status: v1.ConditionFalse, + Reason: registration.SyncError, + }, + { + Type: registration.ConditionRegistered, + Status: v1.ConditionTrue, + }, + { + Type: registration.ConditionACLsUpdated, + Status: v1.ConditionFalse, + Reason: registration.ConsulErrorACL, + }, + }, }, "deregistering": { registration: &v1alpha1.Registration{ @@ -600,7 +681,7 @@ func TestReconcile_Failure(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, DeletionTimestamp: &deletionTime, }, Spec: v1alpha1.RegistrationSpec{ @@ -633,12 +714,18 @@ func TestReconcile_Failure(tt *testing.T) { serverResponseConfig: serverResponseConfig{ errOnDeregister: true, }, - expectedConditions: []v1alpha1.Condition{{ - Type: "Synced", - Status: v1.ConditionFalse, - Reason: configentries.ConsulErrorDeregistration, - Message: "", - }}, + expectedConditions: []v1alpha1.Condition{ + { + Type: v1alpha1.ConditionSynced, + Status: v1.ConditionFalse, + Reason: registration.SyncError, + }, + { + Type: registration.ConditionDeregistered, + Status: v1.ConditionFalse, + Reason: registration.ConsulErrorDeregistration, + }, + }, }, "deregistering - ACLs enabled - terminating-gateway error updating role": { registration: &v1alpha1.Registration{ @@ -648,7 +735,7 @@ func TestReconcile_Failure(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, DeletionTimestamp: &deletionTime, }, Spec: v1alpha1.RegistrationSpec{ @@ -682,12 +769,22 @@ func TestReconcile_Failure(tt *testing.T) { aclEnabled: true, errOnRoleUpdate: true, }, - expectedConditions: []v1alpha1.Condition{{ - Type: "Synced", - Status: v1.ConditionFalse, - Reason: configentries.ConsulErrorACL, - Message: "", - }}, + expectedConditions: []v1alpha1.Condition{ + { + Type: v1alpha1.ConditionSynced, + Status: v1.ConditionFalse, + Reason: registration.SyncError, + }, + { + Type: registration.ConditionDeregistered, + Status: v1.ConditionTrue, + }, + { + Type: registration.ConditionACLsUpdated, + Status: v1.ConditionFalse, + Reason: registration.ConsulErrorACL, + }, + }, }, "deregistering - ACLs enabled - terminating-gateway error deleting policy": { registration: &v1alpha1.Registration{ @@ -697,7 +794,7 @@ func TestReconcile_Failure(tt *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-registration", - Finalizers: []string{configentries.RegistrationFinalizer}, + Finalizers: []string{registration.RegistrationFinalizer}, DeletionTimestamp: &deletionTime, }, Spec: v1alpha1.RegistrationSpec{ @@ -731,12 +828,22 @@ func TestReconcile_Failure(tt *testing.T) { aclEnabled: true, errOnPolicyDelete: true, }, - expectedConditions: []v1alpha1.Condition{{ - Type: "Synced", - Status: v1.ConditionFalse, - Reason: configentries.ConsulErrorACL, - Message: "", - }}, + expectedConditions: []v1alpha1.Condition{ + { + Type: v1alpha1.ConditionSynced, + Status: v1.ConditionFalse, + Reason: registration.SyncError, + }, + { + Type: registration.ConditionDeregistered, + Status: v1.ConditionTrue, + }, + { + Type: registration.ConditionACLsUpdated, + Status: v1.ConditionFalse, + Reason: registration.ConsulErrorACL, + }, + }, }, } @@ -759,12 +866,11 @@ func TestReconcile_Failure(tt *testing.T) { WithStatusSubresource(&v1alpha1.Registration{}). Build() - controller := &configentries.RegistrationsController{ - Client: fakeClient, - Log: logrtest.NewTestLogger(t), - Scheme: s, - ConsulClientConfig: testClient.Cfg, - ConsulServerConnMgr: testClient.Watcher, + controller := ®istration.RegistrationsController{ + Client: fakeClient, + Log: logrtest.NewTestLogger(t), + Scheme: s, + Cache: registration.NewRegistrationCache(testClient.Cfg, testClient.Watcher), } _, err := controller.Reconcile(ctx, ctrl.Request{ @@ -783,7 +889,7 @@ func TestReconcile_Failure(tt *testing.T) { } } - require.ElementsMatch(t, fetchedReg.Finalizers, []string{configentries.RegistrationFinalizer}) + require.ElementsMatch(t, fetchedReg.Finalizers, []string{registration.RegistrationFinalizer}) }) } } diff --git a/control-plane/catalog/registration/result.go b/control-plane/catalog/registration/result.go new file mode 100644 index 0000000000..d267fba8c9 --- /dev/null +++ b/control-plane/catalog/registration/result.go @@ -0,0 +1,137 @@ +package registration + +import ( + "errors" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" +) + +// Conditions. +const ( + ConditionSynced = "Synced" + ConditionRegistered = "Registered" + ConditionDeregistered = "Deregistered" + ConditionACLsUpdated = "ACLsUpdated" +) + +// Status Reasons. +const ( + SyncError = "SyncError" + ConsulErrorRegistration = "ConsulErrorRegistration" + ConsulErrorDeregistration = "ConsulErrorDeregistration" + ConsulErrorACL = "ConsulErrorACL" + ConsulDeregistration = "ConsulDeregistration" +) + +type Result struct { + Registering bool + ConsulDeregistered bool + Sync error + Registration error + Deregistration error + ACLUpdate error + Finalizer error +} + +func (r Result) hasErrors() bool { + return r.Sync != nil || r.Registration != nil || r.ACLUpdate != nil || r.Finalizer != nil +} + +func (r Result) errors() error { + var err error + err = errors.Join(err, r.Sync, r.Registration, r.ACLUpdate, r.Finalizer) + return err +} + +func syncedCondition(result Result) v1alpha1.Condition { + if result.Sync != nil { + return v1alpha1.Condition{ + Type: ConditionSynced, + Status: corev1.ConditionFalse, + Reason: SyncError, + Message: result.Sync.Error(), + LastTransitionTime: metav1.Now(), + } + } + return v1alpha1.Condition{ + Type: ConditionSynced, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + } +} + +func registrationCondition(result Result) v1alpha1.Condition { + if result.Registration != nil { + return v1alpha1.Condition{ + Type: ConditionRegistered, + Status: corev1.ConditionFalse, + Reason: ConsulErrorRegistration, + Message: result.Registration.Error(), + LastTransitionTime: metav1.Now(), + } + } + return v1alpha1.Condition{ + Type: ConditionRegistered, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + } +} + +func deregistrationCondition(result Result) v1alpha1.Condition { + if result.Deregistration != nil { + return v1alpha1.Condition{ + Type: ConditionDeregistered, + Status: corev1.ConditionFalse, + Reason: ConsulErrorDeregistration, + Message: result.Deregistration.Error(), + LastTransitionTime: metav1.Now(), + } + } + + var ( + reason string + message string + ) + if result.ConsulDeregistered { + reason = ConsulDeregistration + message = "Consul deregistered this service" + } + return v1alpha1.Condition{ + Type: ConditionDeregistered, + Status: corev1.ConditionTrue, + Reason: reason, + Message: message, + LastTransitionTime: metav1.Now(), + } +} + +func aclCondition(result Result) v1alpha1.Condition { + if result.ACLUpdate != nil { + return v1alpha1.Condition{ + Type: ConditionACLsUpdated, + Status: corev1.ConditionFalse, + Reason: ConsulErrorACL, + Message: result.ACLUpdate.Error(), + LastTransitionTime: metav1.Now(), + } + } + + if result.ConsulDeregistered { + return v1alpha1.Condition{ + Type: ConditionACLsUpdated, + Status: corev1.ConditionFalse, + Reason: ConsulDeregistration, + Message: "Consul deregistered this service, acls were not removed", + LastTransitionTime: metav1.Now(), + } + } + + return v1alpha1.Condition{ + Type: ConditionACLsUpdated, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + } +} diff --git a/control-plane/controllers/configentries/registrations_controller.go b/control-plane/controllers/configentries/registrations_controller.go deleted file mode 100644 index 86513ca909..0000000000 --- a/control-plane/controllers/configentries/registrations_controller.go +++ /dev/null @@ -1,408 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: MPL-2.0 - -package configentries - -import ( - "context" - "fmt" - "slices" - "strings" - "time" - - "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - capi "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-multierror" - - "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" - "github.com/hashicorp/consul-k8s/control-plane/consul" -) - -const RegistrationFinalizer = "registration.finalizers.consul.hashicorp.com" - -// Status Reasons. -const ( - ConsulErrorRegistration = "ConsulErrorRegistration" - ConsulErrorDeregistration = "ConsulErrorDeregistration" - ConsulErrorACL = "ConsulErrorACL" -) - -// RegistrationsController is the controller for Registrations resources. -type RegistrationsController struct { - client.Client - FinalizerPatcher - Scheme *runtime.Scheme - ConsulClientConfig *consul.Config - ConsulServerConnMgr consul.ServerConnectionManager - Log logr.Logger -} - -// +kubebuilder:rbac:groups=consul.hashicorp.com,resources=servicerouters,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=consul.hashicorp.com,resources=servicerouters/status,verbs=get;update;patch - -func (r *RegistrationsController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.V(1).WithValues("registration", req.NamespacedName) - log.Info("Reconciling Registaration") - - registration := &v1alpha1.Registration{} - // get the registration - if err := r.Client.Get(ctx, req.NamespacedName, registration); err != nil { - if !k8serrors.IsNotFound(err) { - log.Error(err, "unable to get registration") - } - return ctrl.Result{}, client.IgnoreNotFound(err) - } - - client, err := consul.NewClientFromConnMgr(r.ConsulClientConfig, r.ConsulServerConnMgr) - if err != nil { - log.Error(err, "error initializing consul client") - return ctrl.Result{}, err - } - - // deletion request - if !registration.ObjectMeta.DeletionTimestamp.IsZero() { - err := r.handleDeletion(ctx, log, client, registration) - if err != nil { - return ctrl.Result{}, err - } - return ctrl.Result{}, nil - } - - // registration request - err = r.handleRegistration(ctx, log, client, registration) - if err != nil { - return ctrl.Result{}, err - } - return ctrl.Result{}, nil -} - -func (r *RegistrationsController) handleRegistration(ctx context.Context, log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { - log.Info("Registering service") - - patch := r.AddFinalizersPatch(registration, RegistrationFinalizer) - err := r.Patch(ctx, registration, patch) - if err != nil { - return err - } - - err = r.registerService(log, client, registration) - if err != nil { - r.updateStatusError(ctx, log, registration, ConsulErrorRegistration, err) - return err - } - if r.ConsulClientConfig.APIClientConfig.Token != "" || r.ConsulClientConfig.APIClientConfig.TokenFile != "" { - err = r.updateTermGWACLRole(ctx, log, client, registration) - if err != nil { - r.updateStatusError(ctx, log, registration, ConsulErrorACL, err) - return err - } - } - err = r.updateStatus(ctx, log, registration.NamespacedName()) - if err != nil { - return err - } - return nil -} - -func (r *RegistrationsController) registerService(log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { - regReq, err := registration.ToCatalogRegistration() - if err != nil { - return err - } - - _, err = client.Catalog().Register(regReq, nil) - if err != nil { - log.Error(err, "error registering service", "svcName", regReq.Service.Service) - return err - } - - log.Info("Successfully registered service", "svcName", regReq.Service.Service) - return nil -} - -func termGWContainsService(registration *v1alpha1.Registration) func(v1alpha1.LinkedService) bool { - return func(svc v1alpha1.LinkedService) bool { - return svc.Name == registration.Spec.Service.Name - } -} - -func (r *RegistrationsController) updateTermGWACLRole(ctx context.Context, log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { - termGWList := &v1alpha1.TerminatingGatewayList{} - err := r.Client.List(ctx, termGWList) - if err != nil { - log.Error(err, "error listing terminating gateways") - return err - } - - termGWsToUpdate := make([]v1alpha1.TerminatingGateway, 0, len(termGWList.Items)) - for _, termGW := range termGWList.Items { - if slices.ContainsFunc(termGW.Spec.Services, termGWContainsService(registration)) { - termGWsToUpdate = append(termGWsToUpdate, termGW) - } - } - - if len(termGWsToUpdate) == 0 { - log.Info("terminating gateway not found") - return nil - } - - roles, _, err := client.ACL().RoleList(nil) - if err != nil { - log.Error(err, "error reading role list") - return err - } - - policy := &capi.ACLPolicy{ - Name: servicePolicyName(registration.Spec.Service.Name), - Description: "Write policy for terminating gateways for external service", - Rules: fmt.Sprintf(`service %q { policy = "write" }`, registration.Spec.Service.Name), - Datacenters: []string{registration.Spec.Datacenter}, - Namespace: registration.Spec.Service.Namespace, - Partition: registration.Spec.Service.Partition, - } - - existingPolicy, _, err := client.ACL().PolicyReadByName(policy.Name, nil) - if err != nil { - log.Error(err, "error reading policy") - return err - } - - if existingPolicy == nil { - policy, _, err = client.ACL().PolicyCreate(policy, nil) - if err != nil { - return fmt.Errorf("error creating policy: %w", err) - } - } else { - policy = existingPolicy - } - - mErr := &multierror.Error{} - - for _, termGW := range termGWsToUpdate { - var role *capi.ACLRole - for _, r := range roles { - if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) { - role = r - break - } - } - - if role == nil { - log.Info("terminating gateway role not found", "terminatingGatewayName", termGW.Name) - mErr = multierror.Append(mErr, fmt.Errorf("terminating gateway role not found for %q", termGW.Name)) - continue - } - - role.Policies = append(role.Policies, &capi.ACLRolePolicyLink{Name: policy.Name, ID: policy.ID}) - - _, _, err = client.ACL().RoleUpdate(role, nil) - if err != nil { - log.Error(err, "error updating role", "roleName", role.Name) - mErr = multierror.Append(mErr, fmt.Errorf("error updating role %q", role.Name)) - continue - } - } - - return mErr.ErrorOrNil() -} - -func (r *RegistrationsController) handleDeletion(ctx context.Context, log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { - log.Info("Deregistering service") - err := r.deregisterService(log, client, registration) - if err != nil { - r.updateStatusError(ctx, log, registration, ConsulErrorDeregistration, err) - return err - } - if r.ConsulClientConfig.APIClientConfig.Token != "" || r.ConsulClientConfig.APIClientConfig.TokenFile != "" { - err = r.removeTermGWACLRole(ctx, log, client, registration) - if err != nil { - r.updateStatusError(ctx, log, registration, ConsulErrorACL, err) - return err - } - } - patch := r.RemoveFinalizersPatch(registration, RegistrationFinalizer) - err = r.Patch(ctx, registration, patch) - if err != nil { - return err - } - - return nil -} - -func (r *RegistrationsController) deregisterService(log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { - deRegReq := registration.ToCatalogDeregistration() - - _, err := client.Catalog().Deregister(deRegReq, nil) - if err != nil { - log.Error(err, "error deregistering service", "svcID", deRegReq.ServiceID) - return err - } - - log.Info("Successfully deregistered service", "svcID", deRegReq.ServiceID) - return nil -} - -func (r *RegistrationsController) removeTermGWACLRole(ctx context.Context, log logr.Logger, client *capi.Client, registration *v1alpha1.Registration) error { - termGWList := &v1alpha1.TerminatingGatewayList{} - err := r.Client.List(ctx, termGWList) - if err != nil { - return err - } - - termGWsToUpdate := make([]v1alpha1.TerminatingGateway, 0, len(termGWList.Items)) - for _, termGW := range termGWList.Items { - if slices.ContainsFunc(termGW.Spec.Services, termGWContainsService(registration)) { - termGWsToUpdate = append(termGWsToUpdate, termGW) - } - } - - if len(termGWsToUpdate) == 0 { - log.Info("terminating gateway not found") - return nil - } - - roles, _, err := client.ACL().RoleList(nil) - if err != nil { - return err - } - - mErr := &multierror.Error{} - for _, termGW := range termGWsToUpdate { - var role *capi.ACLRole - for _, r := range roles { - if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) { - role = r - break - } - } - - if role == nil { - log.Info("terminating gateway role not found", "terminatingGatewayName", termGW.Name) - mErr = multierror.Append(mErr, fmt.Errorf("terminating gateway role not found for %q", termGW.Name)) - continue - } - - var policyID string - - expectedPolicyName := servicePolicyName(registration.Spec.Service.Name) - role.Policies = slices.DeleteFunc(role.Policies, func(i *capi.ACLRolePolicyLink) bool { - if i.Name == expectedPolicyName { - policyID = i.ID - return true - } - return false - }) - - if policyID == "" { - log.Info("policy not found on terminating gateway role", "policyName", expectedPolicyName, "terminatingGatewayName", termGW.Name) - continue - } - - _, _, err = client.ACL().RoleUpdate(role, nil) - if err != nil { - log.Error(err, "error updating role", "roleName", role.Name) - mErr = multierror.Append(mErr, fmt.Errorf("error updating role %q", role.Name)) - continue - } - - _, err = client.ACL().PolicyDelete(policyID, nil) - if err != nil { - log.Error(err, "error deleting service policy", "policyID", policyID, "policyName", expectedPolicyName) - mErr = multierror.Append(mErr, fmt.Errorf("error deleting service ACL policy %q", policyID)) - continue - } - } - - return mErr.ErrorOrNil() -} - -func servicePolicyName(name string) string { - return fmt.Sprintf("%s-write-policy", name) -} - -func (r *RegistrationsController) updateStatusError(ctx context.Context, log logr.Logger, registration *v1alpha1.Registration, reason string, reconcileErr error) { - registration.SetSyncedCondition(corev1.ConditionFalse, reason, reconcileErr.Error()) - registration.Status.LastSyncedTime = &metav1.Time{Time: time.Now()} - - err := r.Status().Update(ctx, registration) - if err != nil { - log.Error(err, "failed to update Registration status", "name", registration.Name, "namespace", registration.Namespace) - } -} - -func (r *RegistrationsController) updateStatus(ctx context.Context, log logr.Logger, req types.NamespacedName) error { - registration := &v1alpha1.Registration{} - - err := r.Get(ctx, req, registration) - if err != nil { - return err - } - - registration.Status.LastSyncedTime = &metav1.Time{Time: time.Now()} - registration.SetSyncedCondition(corev1.ConditionTrue, "", "") - - err = r.Status().Update(ctx, registration) - if err != nil { - log.Error(err, "failed to update Registration status", "name", registration.Name, "namespace", registration.Namespace) - return err - } - return nil -} - -func (r *RegistrationsController) Logger(name types.NamespacedName) logr.Logger { - return r.Log.WithValues("request", name) -} - -const registrationByServiceNameIndex = "registrationName" - -func (r *RegistrationsController) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { - // setup the index to lookup registrations by service name - if err := mgr.GetFieldIndexer().IndexField(ctx, &v1alpha1.Registration{}, registrationByServiceNameIndex, indexerFn); err != nil { - return err - } - - return ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1.Registration{}). - Watches(&v1alpha1.TerminatingGateway{}, handler.EnqueueRequestsFromMapFunc(r.transformTerminatingGateways)). - Complete(r) -} - -func indexerFn(o client.Object) []string { - reg := o.(*v1alpha1.Registration) - return []string{reg.Spec.Service.Name} -} - -func (r *RegistrationsController) transformTerminatingGateways(ctx context.Context, o client.Object) []reconcile.Request { - termGW := o.(*v1alpha1.TerminatingGateway) - reqs := make([]reconcile.Request, 0, len(termGW.Spec.Services)) - for _, svc := range termGW.Spec.Services { - // lookup registrationList by service name and add it to the reconcile request - registrationList := &v1alpha1.RegistrationList{} - - err := r.Client.List(ctx, registrationList, client.MatchingFields{registrationByServiceNameIndex: svc.Name}) - if err != nil { - r.Log.Error(err, "error listing registrations by service name", "serviceName", svc.Name) - continue - } - - for _, reg := range registrationList.Items { - reqs = append(reqs, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: reg.Name, - Namespace: reg.Namespace, - }, - }) - } - } - return reqs -} diff --git a/control-plane/go.mod b/control-plane/go.mod index 191c5b4142..195aa57c0d 100644 --- a/control-plane/go.mod +++ b/control-plane/go.mod @@ -6,6 +6,7 @@ require ( github.com/cenkalti/backoff v2.2.1+incompatible github.com/containernetworking/cni v1.1.2 github.com/deckarep/golang-set v1.7.1 + github.com/deckarep/golang-set/v2 v2.6.0 github.com/evanphx/json-patch v5.6.0+incompatible github.com/fsnotify/fsnotify v1.6.0 github.com/go-logr/logr v1.2.4 diff --git a/control-plane/go.sum b/control-plane/go.sum index 63b84f150f..09ae4ff31b 100644 --- a/control-plane/go.sum +++ b/control-plane/go.sum @@ -90,6 +90,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ= github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ= +github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= +github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/denverdino/aliyungo v0.0.0-20170926055100-d3308649c661 h1:lrWnAyy/F72MbxIxFUzKmcMCdt9Oi8RzpAxzTNQHD7o= github.com/denverdino/aliyungo v0.0.0-20170926055100-d3308649c661/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= diff --git a/control-plane/subcommand/inject-connect/v1controllers.go b/control-plane/subcommand/inject-connect/v1controllers.go index b835529f7f..268d28c824 100644 --- a/control-plane/subcommand/inject-connect/v1controllers.go +++ b/control-plane/subcommand/inject-connect/v1controllers.go @@ -16,6 +16,7 @@ import ( gatewaycontrollers "github.com/hashicorp/consul-k8s/control-plane/api-gateway/controllers" apicommon "github.com/hashicorp/consul-k8s/control-plane/api/common" "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" + "github.com/hashicorp/consul-k8s/control-plane/catalog/registration" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/controllers/endpoints" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/controllers/peering" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/lifecycle" @@ -285,12 +286,11 @@ func (c *Command) configureV1Controllers(ctx context.Context, mgr manager.Manage return err } - if err := (&controllers.RegistrationsController{ - Client: mgr.GetClient(), - ConsulClientConfig: consulConfig, - ConsulServerConnMgr: watcher, - Scheme: mgr.GetScheme(), - Log: ctrl.Log.WithName("controller").WithName(apicommon.Registration), + if err := (®istration.RegistrationsController{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Cache: registration.NewRegistrationCache(consulConfig, watcher), + Log: ctrl.Log.WithName("controller").WithName(apicommon.Registration), }).SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", apicommon.Registration) return err