diff --git a/internal/controllers/base_controller.go b/internal/controllers/base_controller.go new file mode 100644 index 00000000..2c1f06e2 --- /dev/null +++ b/internal/controllers/base_controller.go @@ -0,0 +1,116 @@ +package controllers + +import ( + "context" + "errors" + "fmt" + "net/http" + "time" + + "github.com/go-logr/logr" + "github.com/ngrok/ngrok-api-go/v5" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type baseControllerOp int + +const ( + createOp baseControllerOp = iota + updateOp + deleteOp +) + +type baseController[T client.Object] struct { + Kube client.Client + Log logr.Logger + Recorder record.EventRecorder + + kubeType string + statusID func(ct T) string + create func(ctx context.Context, cr T) error + update func(ctx context.Context, cr T) error + delete func(ctx context.Context, cr T) error + errResult func(op baseControllerOp, cr T, err error) (ctrl.Result, error) +} + +func (r *baseController[T]) reconcile(ctx context.Context, req ctrl.Request, cr T) (ctrl.Result, error) { + log := r.Log.WithValues(r.kubeType, req.NamespacedName) + ctx = ctrl.LoggerInto(ctx, log) + + if err := r.Kube.Get(ctx, req.NamespacedName, cr); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + crName := req.NamespacedName.String() + if isUpsert(cr) { + if err := registerAndSyncFinalizer(ctx, r.Kube, cr); err != nil { + return ctrl.Result{}, err + } + + if r.statusID != nil && r.statusID(cr) == "" { + r.Recorder.Event(cr, v1.EventTypeNormal, "Creating", fmt.Sprintf("Creating %s: %s", r.kubeType, crName)) + if err := r.create(ctx, cr); err != nil { + r.Recorder.Event(cr, v1.EventTypeWarning, "CreateError", fmt.Sprintf("Failed to create %s %s: %s", r.kubeType, crName, err.Error())) + if r.errResult != nil { + return r.errResult(createOp, cr, err) + } + return reconcileResultFromError(err) + } + r.Recorder.Event(cr, v1.EventTypeNormal, "Created", fmt.Sprintf("Created %s: %s", r.kubeType, crName)) + } else { + r.Recorder.Event(cr, v1.EventTypeNormal, "Updating", fmt.Sprintf("Updating %s: %s", r.kubeType, crName)) + if err := r.update(ctx, cr); err != nil { + r.Recorder.Event(cr, v1.EventTypeWarning, "UpdateError", fmt.Sprintf("Failed to update %s %s: %s", r.kubeType, crName, err.Error())) + if r.errResult != nil { + return r.errResult(updateOp, cr, err) + } + return reconcileResultFromError(err) + } + r.Recorder.Event(cr, v1.EventTypeNormal, "Updated", fmt.Sprintf("Updated %s: %s", r.kubeType, crName)) + } + } else { + if hasFinalizer(cr) { + if r.statusID == nil || r.statusID(cr) != "" { + sid := r.statusID(cr) + r.Recorder.Event(cr, v1.EventTypeNormal, "Deleting", fmt.Sprintf("Deleting %s: %s", r.kubeType, crName)) + if err := r.delete(ctx, cr); err != nil { + if !ngrok.IsNotFound(err) { + r.Recorder.Event(cr, v1.EventTypeWarning, "DeleteError", fmt.Sprintf("Failed to delete %s %s: %s", r.kubeType, crName, err.Error())) + if r.errResult != nil { + return r.errResult(deleteOp, cr, err) + } + return reconcileResultFromError(err) + } + log.Info(fmt.Sprintf("%s not found, assuming it was already deleted", r.kubeType), "ID", sid) + } + r.Recorder.Event(cr, v1.EventTypeNormal, "Deleted", fmt.Sprintf("Deleted %s: %s", r.kubeType, crName)) + } + + if err := removeAndSyncFinalizer(ctx, r.Kube, cr); err != nil { + return ctrl.Result{}, err + } + } + } + + return ctrl.Result{}, nil +} + +func reconcileResultFromError(err error) (ctrl.Result, error) { + var nerr *ngrok.Error + if errors.As(err, &nerr) { + switch { + case nerr.StatusCode >= 500: + return ctrl.Result{}, err + case nerr.StatusCode == http.StatusTooManyRequests: + return ctrl.Result{RequeueAfter: time.Minute}, nil + default: + // the rest are client errors, we don't retry by default + return ctrl.Result{}, nil + } + } + + return ctrl.Result{}, err +} diff --git a/internal/controllers/domain_controller.go b/internal/controllers/domain_controller.go index d779ceb6..9e6f1a07 100644 --- a/internal/controllers/domain_controller.go +++ b/internal/controllers/domain_controller.go @@ -48,6 +48,8 @@ type DomainReconciler struct { Scheme *runtime.Scheme Recorder record.EventRecorder DomainsClient *reserved_domains.Client + + controller *baseController[*ingressv1alpha1.Domain] } // SetupWithManager sets up the controller with the Manager. @@ -56,6 +58,18 @@ func (r *DomainReconciler) SetupWithManager(mgr ctrl.Manager) error { return fmt.Errorf("DomainsClient must be set") } + r.controller = &baseController[*ingressv1alpha1.Domain]{ + Kube: r.Client, + Log: r.Log, + Recorder: r.Recorder, + + kubeType: "v1alpha1.Domain", + statusID: func(cr *ingressv1alpha1.Domain) string { return cr.Status.ID }, + create: r.create, + update: r.update, + delete: r.delete, + } + return ctrl.NewControllerManagedBy(mgr). For(&ingressv1alpha1.Domain{}). WithEventFilter(commonPredicateFilters). @@ -72,72 +86,10 @@ func (r *DomainReconciler) SetupWithManager(mgr ctrl.Manager) error { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.1/pkg/reconcile func (r *DomainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("V1Alpha1Domain", req.NamespacedName) - - domain := new(ingressv1alpha1.Domain) - if err := r.Get(ctx, req.NamespacedName, domain); err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) - } - - if domain.ObjectMeta.DeletionTimestamp.IsZero() { - if err := registerAndSyncFinalizer(ctx, r.Client, domain); err != nil { - return ctrl.Result{}, err - } - } else { - // The object is being deleted - if hasFinalizer(domain) { - if domain.Status.ID != "" { - log.Info("Deleting reserved domain", "ID", domain.Status.ID) - r.Recorder.Event(domain, v1.EventTypeNormal, "Deleting", fmt.Sprintf("Deleting Domain %s", domain.Name)) - // Question: Do we actually want to delete the reserved domains for real? Or maybe just delete the resource and have the user delete the reserved domain from - // the ngrok dashboard manually? - if err := r.DomainsClient.Delete(ctx, domain.Status.ID); err != nil { - if !ngrok.IsNotFound(err) { - r.Recorder.Event(domain, v1.EventTypeWarning, "FailedDelete", fmt.Sprintf("Failed to delete Domain %s: %s", domain.Name, err.Error())) - return ctrl.Result{}, err - } - log.Info("Domain not found, assuming it was already deleted", "ID", domain.Status.ID) - } - domain.Status.ID = "" - } - - if err := removeAndSyncFinalizer(ctx, r.Client, domain); err != nil { - return ctrl.Result{}, err - } - } - - r.Recorder.Event(domain, v1.EventTypeNormal, "Deleted", fmt.Sprintf("Deleted Domain %s", domain.Name)) - - // Stop reconciliation as the item is being deleted - return ctrl.Result{}, nil - } - - if domain.Status.ID != "" { - if err := r.updateExternalResources(ctx, domain); err != nil { - r.Recorder.Event(domain, v1.EventTypeWarning, "UpdateFailed", fmt.Sprintf("Failed to update Domain %s: %s", domain.Name, err.Error())) - return ctrl.Result{}, err - } - } else { - // Create - if err := r.createExternalResources(ctx, domain); err != nil { - r.Recorder.Event(domain, v1.EventTypeWarning, "CreateFailed", fmt.Sprintf("Failed to create Domain %s: %s", domain.Name, err.Error())) - return ctrl.Result{}, err - } - - r.Recorder.Event(domain, v1.EventTypeNormal, "Created", fmt.Sprintf("Created Domain %s", domain.Name)) - } - - return ctrl.Result{}, nil + return r.controller.reconcile(ctx, req, new(ingressv1alpha1.Domain)) } -// Deletes the external resources associated with the ReservedDomain. This is just the reserved domain itself. -// -//nolint:unused -func (r *DomainReconciler) deleteExternalResources(ctx context.Context, domain *ingressv1alpha1.Domain) error { - return r.DomainsClient.Delete(ctx, domain.Status.ID) -} - -func (r *DomainReconciler) createExternalResources(ctx context.Context, domain *ingressv1alpha1.Domain) error { +func (r *DomainReconciler) create(ctx context.Context, domain *ingressv1alpha1.Domain) error { // First check if the reserved domain already exists. The API is sometimes returning dangling CNAME records // errors right now, so we'll check if the domain already exists before trying to create it. resp, err := r.findReservedDomainByHostname(ctx, domain.Spec.Domain) @@ -162,27 +114,34 @@ func (r *DomainReconciler) createExternalResources(ctx context.Context, domain * return r.updateStatus(ctx, domain, resp) } -func (r *DomainReconciler) updateExternalResources(ctx context.Context, domain *ingressv1alpha1.Domain) error { +func (r *DomainReconciler) update(ctx context.Context, domain *ingressv1alpha1.Domain) error { resp, err := r.DomainsClient.Get(ctx, domain.Status.ID) if err != nil { return err } - if !domain.Equal(resp) { - r.Log.Info("Updating reserved domain", "ID", domain.Status.ID) - req := &ngrok.ReservedDomainUpdate{ - ID: domain.Status.ID, - Description: &domain.Spec.Description, - Metadata: &domain.Spec.Metadata, - } - resp, err = r.DomainsClient.Update(ctx, req) - if err != nil { - return err - } - return r.updateStatus(ctx, domain, resp) + if domain.Equal(resp) { + return nil } - return nil + req := &ngrok.ReservedDomainUpdate{ + ID: domain.Status.ID, + Description: &domain.Spec.Description, + Metadata: &domain.Spec.Metadata, + } + resp, err = r.DomainsClient.Update(ctx, req) + if err != nil { + return err + } + return r.updateStatus(ctx, domain, resp) +} + +func (r *DomainReconciler) delete(ctx context.Context, domain *ingressv1alpha1.Domain) error { + err := r.DomainsClient.Delete(ctx, domain.Status.ID) + if err == nil || ngrok.IsNotFound(err) { + domain.Status.ID = "" + } + return err } // finds the reserved domain by the hostname. If it doesn't exist, returns nil diff --git a/internal/controllers/httpsedge_controller.go b/internal/controllers/httpsedge_controller.go index b0293d36..0caa8aad 100644 --- a/internal/controllers/httpsedge_controller.go +++ b/internal/controllers/httpsedge_controller.go @@ -26,6 +26,7 @@ package controllers import ( "context" + "errors" "fmt" "reflect" @@ -41,7 +42,7 @@ import ( "github.com/go-logr/logr" ingressv1alpha1 "github.com/ngrok/kubernetes-ingress-controller/api/v1alpha1" - "github.com/ngrok/kubernetes-ingress-controller/internal/errors" + ierr "github.com/ngrok/kubernetes-ingress-controller/internal/errors" "github.com/ngrok/kubernetes-ingress-controller/internal/ngrokapi" "github.com/ngrok/ngrok-api-go/v5" "github.com/ngrok/ngrok-api-go/v5/backends/tunnel_group" @@ -64,10 +65,33 @@ type HTTPSEdgeReconciler struct { Recorder record.EventRecorder NgrokClientset ngrokapi.Clientset + + controller *baseController[*ingressv1alpha1.HTTPSEdge] } // SetupWithManager sets up the controller with the Manager. func (r *HTTPSEdgeReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.controller = &baseController[*ingressv1alpha1.HTTPSEdge]{ + Kube: r.Client, + Log: r.Log, + Recorder: r.Recorder, + + kubeType: "v1alpha1.HTTPSEdge", + statusID: func(cr *ingressv1alpha1.HTTPSEdge) string { return cr.Status.ID }, + create: r.create, + update: r.update, + delete: r.delete, + errResult: func(op baseControllerOp, cr *ingressv1alpha1.HTTPSEdge, err error) (ctrl.Result, error) { + if errors.As(err, &ierr.ErrInvalidConfiguration{}) { + return ctrl.Result{}, nil + } + if ngrok.IsErrorCode(err, 7117) { // https://ngrok.com/docs/errors/err_ngrok_7117, domain not found + return ctrl.Result{}, err + } + return reconcileResultFromError(err) + }, + } + return ctrl.NewControllerManagedBy(mgr). For(&ingressv1alpha1.HTTPSEdge{}). WithEventFilter(commonPredicateFilters). @@ -84,124 +108,72 @@ func (r *HTTPSEdgeReconciler) SetupWithManager(mgr ctrl.Manager) error { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.1/pkg/reconcile func (r *HTTPSEdgeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("V1Alpha1HTTPSEdge", req.NamespacedName) - ctx = ctrl.LoggerInto(ctx, log) - - edge := new(ingressv1alpha1.HTTPSEdge) - if err := r.Get(ctx, req.NamespacedName, edge); err != nil { - log.Error(err, "unable to fetch Edge") - return ctrl.Result{}, client.IgnoreNotFound(err) - } + return r.controller.reconcile(ctx, req, new(ingressv1alpha1.HTTPSEdge)) +} - if edge == nil { - return ctrl.Result{}, nil +func (r *HTTPSEdgeReconciler) create(ctx context.Context, edge *ingressv1alpha1.HTTPSEdge) error { + remoteEdge, err := r.findEdgeByHostports(ctx, edge.Spec.Hostports) + if err != nil { + return err } - if edge.ObjectMeta.DeletionTimestamp.IsZero() { - if err := registerAndSyncFinalizer(ctx, r.Client, edge); err != nil { - return ctrl.Result{}, err - } - } else { - // The object is being deleted - if hasFinalizer(edge) { - if edge.Status.ID != "" { - r.Recorder.Event(edge, v1.EventTypeNormal, "Deleting", fmt.Sprintf("Deleting Edge %s", edge.Name)) - if err := r.NgrokClientset.HTTPSEdges().Delete(ctx, edge.Status.ID); err != nil { - if !ngrok.IsNotFound(err) { - r.Recorder.Event(edge, v1.EventTypeWarning, "FailedDelete", fmt.Sprintf("Failed to delete Edge %s: %s", edge.Name, err.Error())) - return ctrl.Result{}, err - } - } - edge.Status.ID = "" - } - - if err := removeAndSyncFinalizer(ctx, r.Client, edge); err != nil { - return ctrl.Result{}, err - } + if remoteEdge == nil { + remoteEdge, err = r.NgrokClientset.HTTPSEdges().Create(ctx, &ngrok.HTTPSEdgeCreate{ + Metadata: edge.Spec.Metadata, + Description: edge.Spec.Description, + Hostports: edge.Spec.Hostports, + }) + if err != nil { + return err } - - r.Recorder.Event(edge, v1.EventTypeNormal, "Deleted", fmt.Sprintf("Deleted Edge %s", edge.Name)) - - // Stop reconciliation as the item is being deleted - return ctrl.Result{}, nil } - err := r.reconcileEdge(ctx, edge) - if err != nil { - log.Error(err, "error reconciling Edge") - } - if errors.IsErrorReconcilable(err) { - return ctrl.Result{}, err - } else { - return ctrl.Result{}, nil - } + return r.upsert(ctx, edge, remoteEdge) } -func (r *HTTPSEdgeReconciler) reconcileEdge(ctx context.Context, edge *ingressv1alpha1.HTTPSEdge) error { - var remoteEdge *ngrok.HTTPSEdge - var err error - - logger := ctrl.LoggerFrom(ctx) +func (r *HTTPSEdgeReconciler) update(ctx context.Context, edge *ingressv1alpha1.HTTPSEdge) error { + remoteEdge, err := r.NgrokClientset.HTTPSEdges().Get(ctx, edge.Status.ID) + if err != nil { + return err + } - if edge.Status.ID != "" { - logger = logger.WithValues("ngrok.edge.id", edge.Status.ID) - // We already have an ID, so we can just update the resource - logger.V(1).Info("Getting existing edge") - remoteEdge, err = r.NgrokClientset.HTTPSEdges().Get(ctx, edge.Status.ID) + if !edge.Equal(remoteEdge) { + remoteEdge, err = r.NgrokClientset.HTTPSEdges().Update(ctx, &ngrok.HTTPSEdgeUpdate{ + ID: edge.Status.ID, + Metadata: &edge.Spec.Metadata, + Description: &edge.Spec.Description, + Hostports: edge.Spec.Hostports, + }) if err != nil { return err } - logger.V(1).Info("Found existing edge") - - if !edge.Equal(remoteEdge) { - logger.Info("Updating edge") - remoteEdge, err = r.NgrokClientset.HTTPSEdges().Update(ctx, &ngrok.HTTPSEdgeUpdate{ - ID: edge.Status.ID, - Metadata: &edge.Spec.Metadata, - Description: &edge.Spec.Description, - Hostports: edge.Spec.Hostports, - }) - if err != nil { - return err - } - logger.Info("Updated edge") - } - } else { - logger.Info("Searching for existing edge by hostports", "hostports", edge.Spec.Hostports) - remoteEdge, err = r.findEdgeByHostports(ctx, edge.Spec.Hostports) - if err != nil { - return err - } - - // Not found, so create it - if remoteEdge == nil { - logger.Info("No existing edge found. Creating new edge") - remoteEdge, err = r.NgrokClientset.HTTPSEdges().Create(ctx, &ngrok.HTTPSEdgeCreate{ - Metadata: edge.Spec.Metadata, - Description: edge.Spec.Description, - Hostports: edge.Spec.Hostports, - }) - if err != nil { - return err - } - logger.Info("Created new edge", "ngrok.edge.id", remoteEdge.ID) - } else { - logger.Info("Found existing edge", "ngrok.edge.id", remoteEdge.ID) - } - logger = logger.WithValues("ngrok.edge.id", remoteEdge.ID) } - ctx = ctrl.LoggerInto(ctx, logger) + return r.upsert(ctx, edge, remoteEdge) +} - if err = r.updateStatus(ctx, edge, remoteEdge); err != nil { +func (r *HTTPSEdgeReconciler) upsert(ctx context.Context, edge *ingressv1alpha1.HTTPSEdge, remoteEdge *ngrok.HTTPSEdge) error { + if err := r.updateStatus(ctx, edge, remoteEdge); err != nil { return err } - if err = r.reconcileRoutes(ctx, edge, remoteEdge); err != nil { + if err := r.reconcileRoutes(ctx, edge, remoteEdge); err != nil { return err } - return r.setEdgeTLSTermination(ctx, remoteEdge, edge.Spec.TLSTermination) + if err := r.setEdgeTLSTermination(ctx, remoteEdge, edge.Spec.TLSTermination); err != nil { + return err + } + + return nil +} + +func (r *HTTPSEdgeReconciler) delete(ctx context.Context, edge *ingressv1alpha1.HTTPSEdge) error { + err := r.NgrokClientset.HTTPSEdges().Delete(ctx, edge.Status.ID) + if err == nil || ngrok.IsNotFound(err) { + edge.Status.ID = "" + } + return err } // TODO: This is going to be a bit messy right now, come back and make this cleaner @@ -814,7 +786,7 @@ func (u *edgeRouteModuleUpdater) setEdgeRouteOAuth(ctx context.Context, route *n } if module == nil { - return errors.NewErrInvalidConfiguration(fmt.Errorf("no OAuth provider configured")) + return ierr.NewErrInvalidConfiguration(fmt.Errorf("no OAuth provider configured")) } if reflect.DeepEqual(module, route.OAuth) { @@ -851,7 +823,7 @@ func (u *edgeRouteModuleUpdater) setEdgeRouteOIDC(ctx context.Context, route *ng return err } if clientSecret == nil { - return errors.NewErrMissingRequiredSecret("missing clientSecret for OIDC") + return ierr.NewErrMissingRequiredSecret("missing clientSecret for OIDC") } module := ngrok.EndpointOIDC{ diff --git a/internal/controllers/ingress_controller.go b/internal/controllers/ingress_controller.go index c536c535..088cfbc3 100644 --- a/internal/controllers/ingress_controller.go +++ b/internal/controllers/ingress_controller.go @@ -109,7 +109,7 @@ func (irec *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } - if ingress.ObjectMeta.DeletionTimestamp.IsZero() { + if isUpsert(ingress) { // The object is not being deleted, so register and sync finalizer if err := registerAndSyncFinalizer(ctx, irec.Client, ingress); err != nil { log.Error(err, "Failed to register finalizer") diff --git a/internal/controllers/ippolicy_controller.go b/internal/controllers/ippolicy_controller.go index 5fb238d0..c249928c 100644 --- a/internal/controllers/ippolicy_controller.go +++ b/internal/controllers/ippolicy_controller.go @@ -57,6 +57,8 @@ type IPPolicyReconciler struct { IPPoliciesClient *ip_policies.Client IPPolicyRulesClient *ip_policy_rules.Client + + controller *baseController[*ingressv1alpha1.IPPolicy] } // SetupWithManager sets up the controller with the Manager. @@ -68,6 +70,18 @@ func (r *IPPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error { return fmt.Errorf("IPPolicyRulesClient must be set") } + r.controller = &baseController[*ingressv1alpha1.IPPolicy]{ + Kube: r.Client, + Log: r.Log, + Recorder: r.Recorder, + + kubeType: "v1alpha1.IPPolicy", + statusID: func(cr *ingressv1alpha1.IPPolicy) string { return cr.Status.ID }, + create: r.create, + update: r.update, + delete: r.delete, + } + return ctrl.NewControllerManagedBy(mgr). For(&ingressv1alpha1.IPPolicy{}). Complete(r) @@ -83,72 +97,22 @@ func (r *IPPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.14.1/pkg/reconcile func (r *IPPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("V1Alpha1IPPolicy", req.NamespacedName) - - policy := new(ingressv1alpha1.IPPolicy) - if err := r.Get(ctx, req.NamespacedName, policy); err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) - } - - if policy.ObjectMeta.DeletionTimestamp.IsZero() { - if err := registerAndSyncFinalizer(ctx, r.Client, policy); err != nil { - return ctrl.Result{}, err - } - } else { - // The object is being deleted - if hasFinalizer(policy) { - if policy.Status.ID != "" { - log.Info("Deleting IP Policy", "ID", policy.Status.ID) - r.Recorder.Event(policy, v1.EventTypeNormal, "Deleting", fmt.Sprintf("Deleting policy %s", policy.Name)) - if err := r.IPPoliciesClient.Delete(ctx, policy.Status.ID); err != nil { - if !ngrok.IsNotFound(err) { - r.Recorder.Event(policy, v1.EventTypeWarning, "FailedDelete", fmt.Sprintf("Failed to delete IPPolicy %s: %s", policy.Name, err.Error())) - return ctrl.Result{}, err - } - log.Info("Domain not found, assuming it was already deleted", "ID", policy.Status.ID) - } - policy.Status.ID = "" - } - - if err := removeAndSyncFinalizer(ctx, r.Client, policy); err != nil { - return ctrl.Result{}, err - } - } - - r.Recorder.Event(policy, v1.EventTypeNormal, "Deleted", fmt.Sprintf("Deleted IPPolicy %s", policy.Name)) - - // Stop reconciliation as the item is being deleted - return ctrl.Result{}, nil - } - - if err := r.createOrUpdateIPPolicy(ctx, policy); err != nil { - return ctrl.Result{}, err - } - return ctrl.Result{}, r.createOrUpdateIPPolicyRules(ctx, policy) -} - -//nolint:unused -func (r *IPPolicyReconciler) deleteRemoteResoures(ctx context.Context, policy *ingressv1alpha1.IPPolicy) error { - return r.IPPoliciesClient.Delete(ctx, policy.Status.ID) + return r.controller.reconcile(ctx, req, new(ingressv1alpha1.IPPolicy)) } -func (r *IPPolicyReconciler) createOrUpdateIPPolicy(ctx context.Context, policy *ingressv1alpha1.IPPolicy) error { - if policy.Status.ID == "" { - r.Recorder.Event(policy, v1.EventTypeNormal, "Creating", fmt.Sprintf("Creating IPPolicy %s", policy.Name)) - // Create the IP Policy since it doesn't exist - remotePolicy, err := r.IPPoliciesClient.Create(ctx, &ngrok.IPPolicyCreate{ - Description: policy.Spec.Description, - Metadata: policy.Spec.Metadata, - }) - if err != nil { - return err - } - r.Recorder.Event(policy, v1.EventTypeNormal, "Created", fmt.Sprintf("Created IPPolicy %s", policy.Name)) - policy.Status.ID = remotePolicy.ID - return r.Status().Update(ctx, policy) +func (r *IPPolicyReconciler) create(ctx context.Context, policy *ingressv1alpha1.IPPolicy) error { + remotePolicy, err := r.IPPoliciesClient.Create(ctx, &ngrok.IPPolicyCreate{ + Description: policy.Spec.Description, + Metadata: policy.Spec.Metadata, + }) + if err != nil { + return err } + policy.Status.ID = remotePolicy.ID + return r.Status().Update(ctx, policy) +} - // Update the IP Policy since it already exists +func (r *IPPolicyReconciler) update(ctx context.Context, policy *ingressv1alpha1.IPPolicy) error { remotePolicy, err := r.IPPoliciesClient.Get(ctx, policy.Status.ID) if err != nil { if ngrok.IsNotFound(err) { @@ -174,6 +138,14 @@ func (r *IPPolicyReconciler) createOrUpdateIPPolicy(ctx context.Context, policy return nil } +func (r *IPPolicyReconciler) delete(ctx context.Context, policy *ingressv1alpha1.IPPolicy) error { + err := r.IPPoliciesClient.Delete(ctx, policy.Status.ID) + if err == nil || ngrok.IsNotFound(err) { + policy.Status.ID = "" + } + return err +} + func (r *IPPolicyReconciler) createOrUpdateIPPolicyRules(ctx context.Context, policy *ingressv1alpha1.IPPolicy) error { remoteRules, err := r.getRemotePolicyRules(ctx, policy.Status.ID) if err != nil { diff --git a/internal/controllers/tcpedge_controller.go b/internal/controllers/tcpedge_controller.go index 05dcd527..fb0e9aaf 100644 --- a/internal/controllers/tcpedge_controller.go +++ b/internal/controllers/tcpedge_controller.go @@ -29,7 +29,6 @@ import ( "fmt" "reflect" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -57,12 +56,26 @@ type TCPEdgeReconciler struct { ipPolicyResolver NgrokClientset ngrokapi.Clientset + + controller *baseController[*ingressv1alpha1.TCPEdge] } // SetupWithManager sets up the controller with the Manager. func (r *TCPEdgeReconciler) SetupWithManager(mgr ctrl.Manager) error { r.ipPolicyResolver = ipPolicyResolver{client: mgr.GetClient()} + r.controller = &baseController[*ingressv1alpha1.TCPEdge]{ + Kube: r.Client, + Log: r.Log, + Recorder: r.Recorder, + + kubeType: "v1alpha1.TCPEdge", + statusID: func(cr *ingressv1alpha1.TCPEdge) string { return cr.Status.ID }, + create: r.create, + update: r.update, + delete: r.delete, + } + return ctrl.NewControllerManagedBy(mgr). For(&ingressv1alpha1.TCPEdge{}). Watches( @@ -82,52 +95,97 @@ func (r *TCPEdgeReconciler) SetupWithManager(mgr ctrl.Manager) error { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.1/pkg/reconcile func (r *TCPEdgeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("V1Alpha1TCPEdge", req.NamespacedName) + return r.controller.reconcile(ctx, req, new(ingressv1alpha1.TCPEdge)) +} - edge := new(ingressv1alpha1.TCPEdge) - if err := r.Get(ctx, req.NamespacedName, edge); err != nil { - log.Error(err, "unable to fetch Edge") - return ctrl.Result{}, client.IgnoreNotFound(err) +func (r *TCPEdgeReconciler) create(ctx context.Context, edge *ingressv1alpha1.TCPEdge) error { + if err := r.reconcileTunnelGroupBackend(ctx, edge); err != nil { + return err } - if edge.ObjectMeta.DeletionTimestamp.IsZero() { - if err := registerAndSyncFinalizer(ctx, r.Client, edge); err != nil { - return ctrl.Result{}, err - } - } else { - // The object is being deleted - if hasFinalizer(edge) { - if edge.Status.ID != "" { - r.Recorder.Event(edge, v1.EventTypeNormal, "Deleting", fmt.Sprintf("Deleting Edge %s", edge.Name)) - if err := r.NgrokClientset.TCPEdges().Delete(ctx, edge.Status.ID); err != nil { - if !ngrok.IsNotFound(err) { - r.Recorder.Event(edge, v1.EventTypeWarning, "FailedDelete", fmt.Sprintf("Failed to delete Edge %s: %s", edge.Name, err.Error())) - return ctrl.Result{}, err - } - } - edge.Status.ID = "" - } + if err := r.reserveAddrIfEmpty(ctx, edge); err != nil { + return err + } - if err := removeAndSyncFinalizer(ctx, r.Client, edge); err != nil { - return ctrl.Result{}, err - } - } + // Try to find the edge by the backend labels + resp, err := r.findEdgeByBackendLabels(ctx, edge.Spec.Backend.Labels) + if err != nil { + return err + } + + if resp != nil { + return r.updateEdgeStatus(ctx, edge, resp) + } + + // No edge has been created for this edge, create one + r.Log.Info("Creating new TCPEdge", "namespace", edge.Namespace, "name", edge.Name) + resp, err = r.NgrokClientset.TCPEdges().Create(ctx, &ngrok.TCPEdgeCreate{ + Description: edge.Spec.Description, + Metadata: edge.Spec.Metadata, + Backend: &ngrok.EndpointBackendMutate{ + BackendID: edge.Status.Backend.ID, + }, + }) + if err != nil { + return err + } + r.Log.Info("Created new TCPEdge", "edge.ID", resp.ID, "name", edge.Name, "namespace", edge.Namespace) - r.Recorder.Event(edge, v1.EventTypeNormal, "Deleted", fmt.Sprintf("Deleted Edge %s", edge.Name)) - return ctrl.Result{}, nil + if err := r.updateEdgeStatus(ctx, edge, resp); err != nil { + return err } + return r.updateIPRestrictionRouteModule(ctx, edge, resp) +} + +func (r *TCPEdgeReconciler) update(ctx context.Context, edge *ingressv1alpha1.TCPEdge) error { if err := r.reconcileTunnelGroupBackend(ctx, edge); err != nil { - log.Error(err, "unable to reconcile tunnel group backend", "backend.id", edge.Status.Backend.ID) - return ctrl.Result{}, err + return err } if err := r.reserveAddrIfEmpty(ctx, edge); err != nil { - log.Error(err, "unable to create tcp address") - return ctrl.Result{}, err + return err + } + + resp, err := r.NgrokClientset.TCPEdges().Get(ctx, edge.Status.ID) + if err != nil { + // If we can't find the edge in the ngrok API, it's been deleted, so clear the ID + // and requeue the edge. When it gets reconciled again, it will be recreated. + if ngrok.IsNotFound(err) { + r.Log.Info("TCPEdge not found, clearing ID and requeuing", "edge.ID", edge.Status.ID) + edge.Status.ID = "" + //nolint:errcheck + r.Status().Update(ctx, edge) + } + return err + } + + // If the backend or hostports do not match, update the edge with the desired backend and hostports + if resp.Backend.Backend.ID != edge.Status.Backend.ID || + !reflect.DeepEqual(resp.Hostports, edge.Status.Hostports) { + resp, err = r.NgrokClientset.TCPEdges().Update(ctx, &ngrok.TCPEdgeUpdate{ + ID: resp.ID, + Description: pointer.String(edge.Spec.Description), + Metadata: pointer.String(edge.Spec.Metadata), + Hostports: edge.Status.Hostports, + Backend: &ngrok.EndpointBackendMutate{ + BackendID: edge.Status.Backend.ID, + }, + }) + if err != nil { + return err + } } - return ctrl.Result{}, r.reconcileEdge(ctx, edge) + return r.updateEdgeStatus(ctx, edge, resp) +} + +func (r *TCPEdgeReconciler) delete(ctx context.Context, edge *ingressv1alpha1.TCPEdge) error { + err := r.NgrokClientset.TCPEdges().Delete(ctx, edge.Status.ID) + if err == nil || ngrok.IsNotFound(err) { + edge.Status.ID = "" + } + return err } func (r *TCPEdgeReconciler) reconcileTunnelGroupBackend(ctx context.Context, edge *ingressv1alpha1.TCPEdge) error { @@ -175,74 +233,6 @@ func (r *TCPEdgeReconciler) reconcileTunnelGroupBackend(ctx context.Context, edg return r.Status().Update(ctx, edge) } -func (r *TCPEdgeReconciler) reconcileEdge(ctx context.Context, edge *ingressv1alpha1.TCPEdge) error { - if edge.Status.ID != "" { - // An edge already exists, make sure everything matches - resp, err := r.NgrokClientset.TCPEdges().Get(ctx, edge.Status.ID) - if err != nil { - // If we can't find the edge in the ngrok API, it's been deleted, so clear the ID - // and requeue the edge. When it gets reconciled again, it will be recreated. - if ngrok.IsNotFound(err) { - r.Log.Info("TCPEdge not found, clearing ID and requeuing", "edge.ID", edge.Status.ID) - edge.Status.ID = "" - //nolint:errcheck - r.Status().Update(ctx, edge) - } - return err - } - - // If the backend or hostports do not match, update the edge with the desired backend and hostports - if resp.Backend.Backend.ID != edge.Status.Backend.ID || - !reflect.DeepEqual(resp.Hostports, edge.Status.Hostports) { - resp, err = r.NgrokClientset.TCPEdges().Update(ctx, &ngrok.TCPEdgeUpdate{ - ID: resp.ID, - Description: pointer.String(edge.Spec.Description), - Metadata: pointer.String(edge.Spec.Metadata), - Hostports: edge.Status.Hostports, - Backend: &ngrok.EndpointBackendMutate{ - BackendID: edge.Status.Backend.ID, - }, - }) - if err != nil { - return err - } - } - - return r.updateEdgeStatus(ctx, edge, resp) - } - - // Try to find the edge by the backend labels - resp, err := r.findEdgeByBackendLabels(ctx, edge.Spec.Backend.Labels) - if err != nil { - return err - } - - if resp != nil { - return r.updateEdgeStatus(ctx, edge, resp) - } - - // No edge has been created for this edge, create one - r.Log.Info("Creating new TCPEdge", "namespace", edge.Namespace, "name", edge.Name) - resp, err = r.NgrokClientset.TCPEdges().Create(ctx, &ngrok.TCPEdgeCreate{ - Description: edge.Spec.Description, - Metadata: edge.Spec.Metadata, - Backend: &ngrok.EndpointBackendMutate{ - BackendID: edge.Status.Backend.ID, - }, - }) - if err != nil { - return err - } - r.Log.Info("Created new TCPEdge", "edge.ID", resp.ID, "name", edge.Name, "namespace", edge.Namespace) - - if err := r.updateEdgeStatus(ctx, edge, resp); err != nil { - return err - } - - return r.updateIPRestrictionRouteModule(ctx, edge, resp) - -} - func (r *TCPEdgeReconciler) findEdgeByBackendLabels(ctx context.Context, backendLabels map[string]string) (*ngrok.TCPEdge, error) { r.Log.Info("Searching for existing TCPEdge with backend labels", "labels", backendLabels) iter := r.NgrokClientset.TCPEdges().List(&ngrok.Paging{}) diff --git a/internal/controllers/tunnel_controller.go b/internal/controllers/tunnel_controller.go index b2d75255..e9550179 100644 --- a/internal/controllers/tunnel_controller.go +++ b/internal/controllers/tunnel_controller.go @@ -31,7 +31,6 @@ import ( "github.com/go-logr/logr" ingressv1alpha1 "github.com/ngrok/kubernetes-ingress-controller/api/v1alpha1" "github.com/ngrok/kubernetes-ingress-controller/pkg/tunneldriver" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" @@ -50,6 +49,8 @@ type TunnelReconciler struct { Scheme *runtime.Scheme Recorder record.EventRecorder TunnelDriver *tunneldriver.TunnelDriver + + controller *baseController[*ingressv1alpha1.Tunnel] } // SetupWithManager sets up the controller with the Manager @@ -60,6 +61,16 @@ func (r *TunnelReconciler) SetupWithManager(mgr ctrl.Manager) error { return fmt.Errorf("TunnelDriver is nil") } + r.controller = &baseController[*ingressv1alpha1.Tunnel]{ + Kube: r.Client, + Log: r.Log, + Recorder: r.Recorder, + + kubeType: "v1alpha1.Tunnel", + update: r.update, + delete: r.delete, + } + cont, err := controller.NewUnmanaged("tunnel-controller", mgr, controller.Options{ Reconciler: r, LogConstructor: func(_ *reconcile.Request) logr.Logger { @@ -93,35 +104,15 @@ func (r *TunnelReconciler) SetupWithManager(mgr ctrl.Manager) error { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.1/pkg/reconcile func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("V1Alpha1Tunnel", req.NamespacedName) - ctx = ctrl.LoggerInto(ctx, log) - - tunnel := &ingressv1alpha1.Tunnel{} - - if err := r.Client.Get(ctx, req.NamespacedName, tunnel); err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) - } - - tunnelName := req.NamespacedName.String() - - if isDelete(tunnel.ObjectMeta) { - r.Recorder.Event(tunnel, v1.EventTypeNormal, "Deleting", fmt.Sprintf("Deleting tunnel %s", tunnelName)) - err := r.TunnelDriver.DeleteTunnel(ctx, tunnelName) - if err != nil { - r.Recorder.Event(tunnel, v1.EventTypeWarning, "DeleteError", fmt.Sprintf("Failed to delete tunnel %s: %s", tunnelName, err.Error())) - return ctrl.Result{}, err - } - r.Recorder.Event(tunnel, v1.EventTypeNormal, "Deleted", fmt.Sprintf("Deleted tunnel %s", tunnelName)) - return ctrl.Result{}, nil - } + return r.controller.reconcile(ctx, req, new(ingressv1alpha1.Tunnel)) +} - r.Recorder.Event(tunnel, v1.EventTypeNormal, "Creating", fmt.Sprintf("Creating tunnel %s", tunnelName)) - err := r.TunnelDriver.CreateTunnel(ctx, tunnelName, tunnel.Spec.Labels, tunnel.Spec.BackendConfig, tunnel.Spec.ForwardsTo) - if err != nil { - r.Recorder.Event(tunnel, v1.EventTypeWarning, "CreateError", fmt.Sprintf("Failed to create tunnel %s: %s", tunnelName, err.Error())) - return ctrl.Result{}, err - } - r.Recorder.Event(tunnel, v1.EventTypeNormal, "Created", fmt.Sprintf("Created tunnel %s", tunnelName)) +func (r *TunnelReconciler) update(ctx context.Context, tunnel *ingressv1alpha1.Tunnel) error { + tunnelName := fmt.Sprintf("%s/%s", tunnel.Namespace, tunnel.Name) + return r.TunnelDriver.CreateTunnel(ctx, tunnelName, tunnel.Spec.Labels, tunnel.Spec.BackendConfig, tunnel.Spec.ForwardsTo) +} - return ctrl.Result{}, nil +func (r *TunnelReconciler) delete(ctx context.Context, tunnel *ingressv1alpha1.Tunnel) error { + tunnelName := fmt.Sprintf("%s/%s", tunnel.Namespace, tunnel.Name) + return r.TunnelDriver.DeleteTunnel(ctx, tunnelName) } diff --git a/internal/controllers/utils.go b/internal/controllers/utils.go index 397b591d..e56ef61d 100644 --- a/internal/controllers/utils.go +++ b/internal/controllers/utils.go @@ -7,7 +7,6 @@ import ( ingressv1alpha1 "github.com/ngrok/kubernetes-ingress-controller/api/v1alpha1" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -17,8 +16,12 @@ const ( finalizerName = "k8s.ngrok.com/finalizer" ) -func isDelete(meta metav1.ObjectMeta) bool { - return meta.DeletionTimestamp != nil && !meta.DeletionTimestamp.IsZero() +func isUpsert(o client.Object) bool { + return o.GetDeletionTimestamp().IsZero() +} + +func isDelete(o client.Object) bool { + return !o.GetDeletionTimestamp().IsZero() } func hasFinalizer(o client.Object) bool { diff --git a/internal/errors/errors.go b/internal/errors/errors.go index a4da5b9a..0e8f3e2d 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -1,11 +1,9 @@ package errors import ( - "errors" "fmt" "strings" - "github.com/ngrok/ngrok-api-go/v5" netv1 "k8s.io/api/networking/v1" ) @@ -153,20 +151,3 @@ func (e ErrInvalidConfiguration) Error() string { func (e ErrInvalidConfiguration) Unwrap() error { return e.cause } - -func IsErrorReconcilable(err error) bool { - if err == nil { - return true - } - - if errors.As(err, &ErrInvalidConfiguration{}) { - return false - } - - var nerr *ngrok.Error - if errors.As(err, &nerr) { - return nerr.StatusCode >= 500 - } - - return true -}