diff --git a/pkg/store/store.go b/pkg/store/store.go index 4e36a2a7db..75f649b80f 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -221,7 +221,49 @@ func (c CacheStores) Add(obj runtime.Object) error { case *knative.Ingress: return c.KnativeIngress.Add(obj) default: - return fmt.Errorf("cannot add kind %q to the store", obj.GetObjectKind().GroupVersionKind()) + return fmt.Errorf("cannot add unsupported kind %q to the store", obj.GetObjectKind().GroupVersionKind()) + } +} + +// Delete removes a provided runtime.Object from the CacheStore if it's of a supported type. +// The CacheStore must be initialized (see NewCacheStores()) or this will panic. +func (c CacheStores) Delete(obj runtime.Object) error { + switch obj := obj.(type) { + // ---------------------------------------------------------------------------- + // Kubernetes Core API Support + // ---------------------------------------------------------------------------- + case *extensions.Ingress: + return c.IngressV1beta1.Delete(obj) + case *networkingv1.Ingress: + return c.IngressV1.Delete(obj) + case *corev1.Service: + return c.Service.Delete(obj) + case *corev1.Secret: + return c.Secret.Delete(obj) + case *corev1.Endpoints: + return c.Endpoint.Delete(obj) + // ---------------------------------------------------------------------------- + // Kong API Support + // ---------------------------------------------------------------------------- + case *kongv1.KongPlugin: + return c.Plugin.Delete(obj) + case *kongv1.KongClusterPlugin: + return c.ClusterPlugin.Delete(obj) + case *kongv1.KongConsumer: + return c.Consumer.Delete(obj) + case *kongv1.KongIngress: + return c.KongIngress.Delete(obj) + case *kongv1beta1.TCPIngress: + return c.TCPIngress.Delete(obj) + case *kongv1alpha1.UDPIngress: + return c.UDPIngress.Delete(obj) + // ---------------------------------------------------------------------------- + // 3rd Party API Support + // ---------------------------------------------------------------------------- + case *knative.Ingress: + return c.KnativeIngress.Delete(obj) + default: + return fmt.Errorf("cannot delete unsupported kind %q from the store", obj.GetObjectKind().GroupVersionKind()) } } diff --git a/railgun/controllers/configuration/zz_generated_controllers.go b/railgun/controllers/configuration/zz_generated_controllers.go index 56fe567d21..2261ff3f07 100644 --- a/railgun/controllers/configuration/zz_generated_controllers.go +++ b/railgun/controllers/configuration/zz_generated_controllers.go @@ -24,6 +24,7 @@ import ( "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" extv1beta1 "k8s.io/api/extensions/v1beta1" netv1 "k8s.io/api/networking/v1" netv1beta1 "k8s.io/api/networking/v1beta1" @@ -36,9 +37,133 @@ import ( kongv1beta1 "github.com/kong/kubernetes-ingress-controller/railgun/apis/configuration/v1beta1" "github.com/kong/kubernetes-ingress-controller/railgun/internal/ctrlutils" - "github.com/kong/kubernetes-ingress-controller/railgun/internal/mgrutils" + "github.com/kong/kubernetes-ingress-controller/railgun/internal/proxy" ) +// ----------------------------------------------------------------------------- +// CoreV1 Service +// ----------------------------------------------------------------------------- + +// CoreV1Service reconciles a Ingress object +type CoreV1ServiceReconciler struct { + client.Client + + Log logr.Logger + Scheme *runtime.Scheme + Proxy proxy.Proxy +} + +// SetupWithManager sets up the controller with the Manager. +func (r *CoreV1ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr).For(&corev1.Service{}).Complete(r) +} + +//+kubebuilder:rbac:groups=v1,resources=services,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=v1,resources=services/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=v1,resources=services/finalizers,verbs=update + +// Reconcile processes the watched objects +func (r *CoreV1ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.Log.WithValues("CoreV1Service", req.NamespacedName) + + // get the relevant object + obj := new(corev1.Service) + if err := r.Get(ctx, req.NamespacedName, obj); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + log.Info("reconciling resource", "namespace", req.Namespace, "name", req.Name) + + // clean the object up if it's being deleted + if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { + log.Info("resource is being deleted, its configuration will be removed", "type", "Service", "namespace", req.Namespace, "name", req.Name) + if err := r.Proxy.DeleteObject(obj); err != nil { + return ctrl.Result{}, err + } + return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) + } + + // before we store cache data for this object, ensure that it has our finalizer set + if !ctrlutils.HasFinalizer(obj, ctrlutils.KongIngressFinalizer) { + log.Info("finalizer is not set for ingress object, setting it", req.Namespace, req.Name) + finalizers := obj.GetFinalizers() + obj.SetFinalizers(append(finalizers, ctrlutils.KongIngressFinalizer)) + if err := r.Client.Update(ctx, obj); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{Requeue: true}, nil + } + + // update the kong Admin API with the changes + log.Info("updating the proxy with new Service", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil +} + +// ----------------------------------------------------------------------------- +// CoreV1 Endpoints +// ----------------------------------------------------------------------------- + +// CoreV1Endpoints reconciles a Ingress object +type CoreV1EndpointsReconciler struct { + client.Client + + Log logr.Logger + Scheme *runtime.Scheme + Proxy proxy.Proxy +} + +// SetupWithManager sets up the controller with the Manager. +func (r *CoreV1EndpointsReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr).For(&corev1.Endpoints{}).Complete(r) +} + +//+kubebuilder:rbac:groups=v1,resources=endpoints,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=v1,resources=endpoints/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=v1,resources=endpoints/finalizers,verbs=update + +// Reconcile processes the watched objects +func (r *CoreV1EndpointsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.Log.WithValues("CoreV1Endpoints", req.NamespacedName) + + // get the relevant object + obj := new(corev1.Endpoints) + if err := r.Get(ctx, req.NamespacedName, obj); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + log.Info("reconciling resource", "namespace", req.Namespace, "name", req.Name) + + // clean the object up if it's being deleted + if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { + log.Info("resource is being deleted, its configuration will be removed", "type", "Endpoints", "namespace", req.Namespace, "name", req.Name) + if err := r.Proxy.DeleteObject(obj); err != nil { + return ctrl.Result{}, err + } + return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) + } + + // before we store cache data for this object, ensure that it has our finalizer set + if !ctrlutils.HasFinalizer(obj, ctrlutils.KongIngressFinalizer) { + log.Info("finalizer is not set for ingress object, setting it", req.Namespace, req.Name) + finalizers := obj.GetFinalizers() + obj.SetFinalizers(append(finalizers, ctrlutils.KongIngressFinalizer)) + if err := r.Client.Update(ctx, obj); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{Requeue: true}, nil + } + + // update the kong Admin API with the changes + log.Info("updating the proxy with new Endpoints", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil +} + // ----------------------------------------------------------------------------- // NetV1 Ingress // ----------------------------------------------------------------------------- @@ -49,8 +174,7 @@ type NetV1IngressReconciler struct { Log logr.Logger Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams + Proxy proxy.Proxy } // SetupWithManager sets up the controller with the Manager. @@ -76,10 +200,7 @@ func (r *NetV1IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request // clean the object up if it's being deleted if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { log.Info("resource is being deleted, its configuration will be removed", "type", "Ingress", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.IngressV1.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { + if err := r.Proxy.DeleteObject(obj); err != nil { return ctrl.Result{}, err } return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) @@ -96,13 +217,13 @@ func (r *NetV1IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{Requeue: true}, nil } - // cache the new object - if err := mgrutils.CacheStores.IngressV1.Add(obj); err != nil { + // update the kong Admin API with the changes + log.Info("updating the proxy with new Ingress", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { return ctrl.Result{}, err } - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) + return ctrl.Result{}, nil } // ----------------------------------------------------------------------------- @@ -115,8 +236,7 @@ type NetV1Beta1IngressReconciler struct { Log logr.Logger Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams + Proxy proxy.Proxy } // SetupWithManager sets up the controller with the Manager. @@ -142,10 +262,7 @@ func (r *NetV1Beta1IngressReconciler) Reconcile(ctx context.Context, req ctrl.Re // clean the object up if it's being deleted if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { log.Info("resource is being deleted, its configuration will be removed", "type", "Ingress", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.IngressV1beta1.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { + if err := r.Proxy.DeleteObject(obj); err != nil { return ctrl.Result{}, err } return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) @@ -162,13 +279,13 @@ func (r *NetV1Beta1IngressReconciler) Reconcile(ctx context.Context, req ctrl.Re return ctrl.Result{Requeue: true}, nil } - // cache the new object - if err := mgrutils.CacheStores.IngressV1beta1.Add(obj); err != nil { + // update the kong Admin API with the changes + log.Info("updating the proxy with new Ingress", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { return ctrl.Result{}, err } - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) + return ctrl.Result{}, nil } // ----------------------------------------------------------------------------- @@ -181,8 +298,7 @@ type ExtV1Beta1IngressReconciler struct { Log logr.Logger Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams + Proxy proxy.Proxy } // SetupWithManager sets up the controller with the Manager. @@ -208,10 +324,7 @@ func (r *ExtV1Beta1IngressReconciler) Reconcile(ctx context.Context, req ctrl.Re // clean the object up if it's being deleted if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { log.Info("resource is being deleted, its configuration will be removed", "type", "Ingress", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.IngressV1beta1.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { + if err := r.Proxy.DeleteObject(obj); err != nil { return ctrl.Result{}, err } return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) @@ -228,13 +341,13 @@ func (r *ExtV1Beta1IngressReconciler) Reconcile(ctx context.Context, req ctrl.Re return ctrl.Result{Requeue: true}, nil } - // cache the new object - if err := mgrutils.CacheStores.IngressV1beta1.Add(obj); err != nil { + // update the kong Admin API with the changes + log.Info("updating the proxy with new Ingress", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { return ctrl.Result{}, err } - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) + return ctrl.Result{}, nil } // ----------------------------------------------------------------------------- @@ -247,8 +360,7 @@ type KongV1KongIngressReconciler struct { Log logr.Logger Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams + Proxy proxy.Proxy } // SetupWithManager sets up the controller with the Manager. @@ -274,10 +386,7 @@ func (r *KongV1KongIngressReconciler) Reconcile(ctx context.Context, req ctrl.Re // clean the object up if it's being deleted if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { log.Info("resource is being deleted, its configuration will be removed", "type", "KongIngress", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.KongIngress.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { + if err := r.Proxy.DeleteObject(obj); err != nil { return ctrl.Result{}, err } return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) @@ -294,13 +403,13 @@ func (r *KongV1KongIngressReconciler) Reconcile(ctx context.Context, req ctrl.Re return ctrl.Result{Requeue: true}, nil } - // cache the new object - if err := mgrutils.CacheStores.KongIngress.Add(obj); err != nil { + // update the kong Admin API with the changes + log.Info("updating the proxy with new KongIngress", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { return ctrl.Result{}, err } - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) + return ctrl.Result{}, nil } // ----------------------------------------------------------------------------- @@ -313,8 +422,7 @@ type KongV1KongPluginReconciler struct { Log logr.Logger Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams + Proxy proxy.Proxy } // SetupWithManager sets up the controller with the Manager. @@ -340,10 +448,7 @@ func (r *KongV1KongPluginReconciler) Reconcile(ctx context.Context, req ctrl.Req // clean the object up if it's being deleted if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { log.Info("resource is being deleted, its configuration will be removed", "type", "KongPlugin", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.Plugin.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { + if err := r.Proxy.DeleteObject(obj); err != nil { return ctrl.Result{}, err } return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) @@ -360,13 +465,13 @@ func (r *KongV1KongPluginReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{Requeue: true}, nil } - // cache the new object - if err := mgrutils.CacheStores.Plugin.Add(obj); err != nil { + // update the kong Admin API with the changes + log.Info("updating the proxy with new KongPlugin", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { return ctrl.Result{}, err } - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) + return ctrl.Result{}, nil } // ----------------------------------------------------------------------------- @@ -379,8 +484,7 @@ type KongV1KongClusterPluginReconciler struct { Log logr.Logger Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams + Proxy proxy.Proxy } // SetupWithManager sets up the controller with the Manager. @@ -406,10 +510,7 @@ func (r *KongV1KongClusterPluginReconciler) Reconcile(ctx context.Context, req c // clean the object up if it's being deleted if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { log.Info("resource is being deleted, its configuration will be removed", "type", "KongClusterPlugin", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.ClusterPlugin.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { + if err := r.Proxy.DeleteObject(obj); err != nil { return ctrl.Result{}, err } return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) @@ -426,13 +527,13 @@ func (r *KongV1KongClusterPluginReconciler) Reconcile(ctx context.Context, req c return ctrl.Result{Requeue: true}, nil } - // cache the new object - if err := mgrutils.CacheStores.ClusterPlugin.Add(obj); err != nil { + // update the kong Admin API with the changes + log.Info("updating the proxy with new KongClusterPlugin", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { return ctrl.Result{}, err } - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) + return ctrl.Result{}, nil } // ----------------------------------------------------------------------------- @@ -445,8 +546,7 @@ type KongV1KongConsumerReconciler struct { Log logr.Logger Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams + Proxy proxy.Proxy } // SetupWithManager sets up the controller with the Manager. @@ -472,10 +572,7 @@ func (r *KongV1KongConsumerReconciler) Reconcile(ctx context.Context, req ctrl.R // clean the object up if it's being deleted if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { log.Info("resource is being deleted, its configuration will be removed", "type", "KongConsumer", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.Consumer.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { + if err := r.Proxy.DeleteObject(obj); err != nil { return ctrl.Result{}, err } return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) @@ -492,13 +589,13 @@ func (r *KongV1KongConsumerReconciler) Reconcile(ctx context.Context, req ctrl.R return ctrl.Result{Requeue: true}, nil } - // cache the new object - if err := mgrutils.CacheStores.Consumer.Add(obj); err != nil { + // update the kong Admin API with the changes + log.Info("updating the proxy with new KongConsumer", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { return ctrl.Result{}, err } - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) + return ctrl.Result{}, nil } // ----------------------------------------------------------------------------- @@ -511,8 +608,7 @@ type KongV1Alpha1UDPIngressReconciler struct { Log logr.Logger Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams + Proxy proxy.Proxy } // SetupWithManager sets up the controller with the Manager. @@ -538,10 +634,7 @@ func (r *KongV1Alpha1UDPIngressReconciler) Reconcile(ctx context.Context, req ct // clean the object up if it's being deleted if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { log.Info("resource is being deleted, its configuration will be removed", "type", "UDPIngress", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.UDPIngress.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { + if err := r.Proxy.DeleteObject(obj); err != nil { return ctrl.Result{}, err } return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) @@ -558,13 +651,13 @@ func (r *KongV1Alpha1UDPIngressReconciler) Reconcile(ctx context.Context, req ct return ctrl.Result{Requeue: true}, nil } - // cache the new object - if err := mgrutils.CacheStores.UDPIngress.Add(obj); err != nil { + // update the kong Admin API with the changes + log.Info("updating the proxy with new UDPIngress", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { return ctrl.Result{}, err } - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) + return ctrl.Result{}, nil } // ----------------------------------------------------------------------------- @@ -577,8 +670,7 @@ type KongV1Beta1TCPIngressReconciler struct { Log logr.Logger Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams + Proxy proxy.Proxy } // SetupWithManager sets up the controller with the Manager. @@ -604,10 +696,7 @@ func (r *KongV1Beta1TCPIngressReconciler) Reconcile(ctx context.Context, req ctr // clean the object up if it's being deleted if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { log.Info("resource is being deleted, its configuration will be removed", "type", "TCPIngress", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.TCPIngress.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { + if err := r.Proxy.DeleteObject(obj); err != nil { return ctrl.Result{}, err } return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) @@ -624,11 +713,11 @@ func (r *KongV1Beta1TCPIngressReconciler) Reconcile(ctx context.Context, req ctr return ctrl.Result{Requeue: true}, nil } - // cache the new object - if err := mgrutils.CacheStores.TCPIngress.Add(obj); err != nil { + // update the kong Admin API with the changes + log.Info("updating the proxy with new TCPIngress", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { return ctrl.Result{}, err } - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) + return ctrl.Result{}, nil } diff --git a/railgun/controllers/corev1/endpoints_controller.go b/railgun/controllers/corev1/endpoints_controller.go deleted file mode 100644 index 0f634d617a..0000000000 --- a/railgun/controllers/corev1/endpoints_controller.go +++ /dev/null @@ -1,83 +0,0 @@ -package corev1 - -import ( - "context" - "time" - - "github.com/go-logr/logr" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/kong/kubernetes-ingress-controller/railgun/internal/ctrlutils" - "github.com/kong/kubernetes-ingress-controller/railgun/internal/mgrutils" -) - -// ----------------------------------------------------------------------------- -// CoreV1 Endpoints -// ----------------------------------------------------------------------------- - -// CoreV1Endpoints reconciles a Endpoint object -type CoreV1EndpointsReconciler struct { - client.Client - - Log logr.Logger - Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams -} - -// SetupWithManager sets up the controller with the Manager. -func (r *CoreV1EndpointsReconciler) SetupWithManager(mgr ctrl.Manager) error { - // TODO: this is too broad, we need to sweep for Endpoints referred to by Services we support. - // See: https://github.com/Kong/kubernetes-ingress-controller/issues/1259 - return ctrl.NewControllerManagedBy(mgr).For(&v1.Endpoints{}).Complete(r) -} - -//+kubebuilder:rbac:groups=v1,resources=endpoints,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=v1,resources=endpoints/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=v1,resources=endpoints/finalizers,verbs=update - -// Reconcile processes the watched objects -func (r *CoreV1EndpointsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("CoreV1Endpoint", req.NamespacedName) - - // get the relevant object - obj := new(v1.Endpoints) - if err := r.Get(ctx, req.NamespacedName, obj); err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) - } - log.Info("reconciling resource", "namespace", req.Namespace, "name", req.Name) - - // clean the object up if it's being deleted - if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { - log.Info("resource is being deleted, its configuration will be removed", "type", "Endpoint", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.Endpoint.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { - return ctrl.Result{}, err - } - return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) - } - - // before we store cache data for this object, ensure that it has our finalizer set - if !ctrlutils.HasFinalizer(obj, ctrlutils.KongIngressFinalizer) { - log.Info("finalizer is not set for ingress object, setting it", req.Namespace, req.Name) - finalizers := obj.GetFinalizers() - obj.SetFinalizers(append(finalizers, ctrlutils.KongIngressFinalizer)) - if err := r.Client.Update(ctx, obj); err != nil { // TODO: patch here instead of update - return ctrl.Result{}, err - } - return ctrl.Result{Requeue: true}, nil - } - - // cache the new object - if err := mgrutils.CacheStores.Endpoint.Add(obj); err != nil { - return ctrl.Result{}, err - } - - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) -} diff --git a/railgun/controllers/corev1/service_controller.go b/railgun/controllers/corev1/service_controller.go deleted file mode 100644 index d99cb25db4..0000000000 --- a/railgun/controllers/corev1/service_controller.go +++ /dev/null @@ -1,83 +0,0 @@ -package corev1 - -import ( - "context" - "time" - - "github.com/go-logr/logr" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/kong/kubernetes-ingress-controller/railgun/internal/ctrlutils" - "github.com/kong/kubernetes-ingress-controller/railgun/internal/mgrutils" -) - -// ----------------------------------------------------------------------------- -// CoreV1 Service -// ----------------------------------------------------------------------------- - -// CoreV1Service reconciles a Service object -type CoreV1ServiceReconciler struct { - client.Client - - Log logr.Logger - Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams -} - -// SetupWithManager sets up the controller with the Manager. -func (r *CoreV1ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { - // TODO: this is too broad, we need to sweep for Services referred to by other objects we support not all - // See: https://github.com/Kong/kubernetes-ingress-controller/issues/1259 - return ctrl.NewControllerManagedBy(mgr).For(&v1.Service{}).Complete(r) -} - -//+kubebuilder:rbac:groups=v1,resources=services,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=v1,resources=services/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=v1,resources=services/finalizers,verbs=update - -// Reconcile processes the watched objects -func (r *CoreV1ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("CoreV1Service", req.NamespacedName) - - // get the relevant object - obj := new(v1.Service) - if err := r.Get(ctx, req.NamespacedName, obj); err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) - } - log.Info("reconciling resource", "namespace", req.Namespace, "name", req.Name) - - // clean the object up if it's being deleted - if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { - log.Info("resource is being deleted, its configuration will be removed", "type", "Service", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.Service.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { - return ctrl.Result{}, err - } - return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) - } - - // before we store cache data for this object, ensure that it has our finalizer set - if !ctrlutils.HasFinalizer(obj, ctrlutils.KongIngressFinalizer) { - log.Info("finalizer is not set for ingress object, setting it", req.Namespace, req.Name) - finalizers := obj.GetFinalizers() - obj.SetFinalizers(append(finalizers, ctrlutils.KongIngressFinalizer)) - if err := r.Client.Update(ctx, obj); err != nil { // TODO: patch here instead of update - return ctrl.Result{}, err - } - return ctrl.Result{Requeue: true}, nil - } - - // cache the new object - if err := mgrutils.CacheStores.Service.Add(obj); err != nil { - return ctrl.Result{}, err - } - - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) -} diff --git a/railgun/hack/generators/controllers/networking/main.go b/railgun/hack/generators/controllers/networking/main.go index a2f51fa927..e47a364e0b 100644 --- a/railgun/hack/generators/controllers/networking/main.go +++ b/railgun/hack/generators/controllers/networking/main.go @@ -19,6 +19,24 @@ const outputFile = "controllers/configuration/zz_generated_controllers.go" // for support, add it here and a new controller will be generated // when you run `make controllers`. var inputControllersNeeded = &typesNeeded{ + typeNeeded{ + PackageImportAlias: "corev1", + PackageAlias: "CoreV1", + Package: "k8s.io/api/core/v1", + Type: "Service", + Plural: "services", + URL: "v1", + CacheType: "Service", + }, + typeNeeded{ + PackageImportAlias: "corev1", + PackageAlias: "CoreV1", + Package: "k8s.io/api/core/v1", + Type: "Endpoints", + Plural: "endpoints", + URL: "v1", + CacheType: "Endpoint", + }, typeNeeded{ PackageImportAlias: "netv1", PackageAlias: "NetV1", @@ -190,6 +208,7 @@ import ( "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" extv1beta1 "k8s.io/api/extensions/v1beta1" netv1 "k8s.io/api/networking/v1" netv1beta1 "k8s.io/api/networking/v1beta1" @@ -202,7 +221,7 @@ import ( kongv1beta1 "github.com/kong/kubernetes-ingress-controller/railgun/apis/configuration/v1beta1" "github.com/kong/kubernetes-ingress-controller/railgun/internal/ctrlutils" - "github.com/kong/kubernetes-ingress-controller/railgun/internal/mgrutils" + "github.com/kong/kubernetes-ingress-controller/railgun/internal/proxy" ) ` @@ -217,8 +236,7 @@ type {{.PackageAlias}}{{.Type}}Reconciler struct { Log logr.Logger Scheme *runtime.Scheme - - ProxyUpdateParams ctrlutils.ProxyUpdateParams + Proxy proxy.Proxy } // SetupWithManager sets up the controller with the Manager. @@ -244,10 +262,7 @@ func (r *{{.PackageAlias}}{{.Type}}Reconciler) Reconcile(ctx context.Context, re // clean the object up if it's being deleted if !obj.DeletionTimestamp.IsZero() && time.Now().After(obj.DeletionTimestamp.Time) { log.Info("resource is being deleted, its configuration will be removed", "type", "{{.Type}}", "namespace", req.Namespace, "name", req.Name) - if err := mgrutils.CacheStores.{{.CacheType}}.Delete(obj); err != nil { - return ctrl.Result{}, err - } - if err := ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams); err != nil { + if err := r.Proxy.DeleteObject(obj); err != nil { return ctrl.Result{}, err } return ctrlutils.CleanupFinalizer(ctx, r.Client, log, req.NamespacedName, obj) @@ -264,12 +279,12 @@ func (r *{{.PackageAlias}}{{.Type}}Reconciler) Reconcile(ctx context.Context, re return ctrl.Result{Requeue: true}, nil } - // cache the new object - if err := mgrutils.CacheStores.{{.CacheType}}.Add(obj); err != nil { + // update the kong Admin API with the changes + log.Info("updating the proxy with new {{.Type}}", "namespace", obj.Namespace, "name", obj.Name) + if err := r.Proxy.UpdateObject(obj); err != nil { return ctrl.Result{}, err } - // update the kong Admin API with the changes - return ctrl.Result{}, ctrlutils.UpdateKongAdmin(ctx, r.ProxyUpdateParams) + return ctrl.Result{}, nil } ` diff --git a/railgun/internal/ctrlutils/kong.go b/railgun/internal/ctrlutils/kong.go deleted file mode 100644 index 7c378e4718..0000000000 --- a/railgun/internal/ctrlutils/kong.go +++ /dev/null @@ -1,12 +0,0 @@ -package ctrlutils - -import "github.com/kong/kubernetes-ingress-controller/pkg/sendconfig" - -// ProxyUpdateParams defines all the attrs needed to perform a full configuration update on the Kong Admin API. -type ProxyUpdateParams struct { - IngressClassName string - KongConfig sendconfig.Kong - ProcessClasslessIngressV1Beta1 bool - ProcessClasslessIngressV1 bool - ProcessClasslessKongConsumer bool -} diff --git a/railgun/internal/ctrlutils/utils.go b/railgun/internal/ctrlutils/utils.go index 32d0e9ef45..96f1a2183e 100644 --- a/railgun/internal/ctrlutils/utils.go +++ b/railgun/internal/ctrlutils/utils.go @@ -3,47 +3,14 @@ package ctrlutils import ( "context" "strings" - "time" "github.com/go-logr/logr" - "github.com/kong/kubernetes-ingress-controller/pkg/deckgen" - "github.com/kong/kubernetes-ingress-controller/pkg/parser" - "github.com/kong/kubernetes-ingress-controller/pkg/sendconfig" - "github.com/kong/kubernetes-ingress-controller/pkg/store" - "github.com/kong/kubernetes-ingress-controller/railgun/internal/mgrutils" - "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) -// KongIngressFinalizer is the finalizer used to ensure Kong configuration cleanup for deleted resources. -const KongIngressFinalizer = "configuration.konghq.com/ingress" - -// UpdateKongAdmin is a helper function to take the contents of a Kong config and update the Admin API with the parsed contents. -func UpdateKongAdmin(ctx context.Context, params ProxyUpdateParams) error { - // build the kongstate object from the Kubernetes objects in the storer - storer := store.New(*mgrutils.CacheStores, params.IngressClassName, params.ProcessClasslessIngressV1, params.ProcessClasslessIngressV1Beta1, params.ProcessClasslessKongConsumer, logrus.StandardLogger()) - kongstate, err := parser.Build(logrus.StandardLogger(), storer) - if err != nil { - return err - } - - // generate the deck configuration to be applied to the admin API - targetConfig := deckgen.ToDeckContent(ctx, logrus.StandardLogger(), kongstate, nil, nil) - - // apply the configuration update in Kong - timedCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - _, err = sendconfig.PerformUpdate(timedCtx, logrus.StandardLogger(), ¶ms.KongConfig, true, false, targetConfig, nil, nil, nil) - if err != nil { - return err - } - - return nil -} - // CleanupFinalizer ensures that a deleted resource is no longer present in the object cache. func CleanupFinalizer(ctx context.Context, c client.Client, log logr.Logger, nsn types.NamespacedName, obj client.Object) (ctrl.Result, error) { if HasFinalizer(obj, KongIngressFinalizer) { diff --git a/railgun/internal/ctrlutils/vars.go b/railgun/internal/ctrlutils/vars.go index 65fa34d268..a77109001b 100644 --- a/railgun/internal/ctrlutils/vars.go +++ b/railgun/internal/ctrlutils/vars.go @@ -31,4 +31,7 @@ var ( // ProxyInstanceLabel is a label used for controllers (such as the secret configuration // controller) to identify which pods are running the Kong proxy which needs to be configured. ProxyInstanceLabel = "konghq.com/proxy-instance" + + // KongIngressFinalizer is the finalizer used to ensure Kong configuration cleanup for deleted resources. + KongIngressFinalizer = "configuration.konghq.com/ingress" ) diff --git a/railgun/internal/mgrutils/store.go b/railgun/internal/mgrutils/store.go deleted file mode 100644 index 78f46548b4..0000000000 --- a/railgun/internal/mgrutils/store.go +++ /dev/null @@ -1,15 +0,0 @@ -package mgrutils - -import ( - "github.com/kong/kubernetes-ingress-controller/pkg/store" -) - -var ( - // CacheStores is the global cache for all controllers to store and retrieve Kubernetes objects from cache. - CacheStores *store.CacheStores -) - -func init() { - newCacheStores := store.NewCacheStores() - CacheStores = &newCacheStores -} diff --git a/railgun/internal/proxy/clientgo_cached_proxy_resolver.go b/railgun/internal/proxy/clientgo_cached_proxy_resolver.go new file mode 100644 index 0000000000..856b75b8e0 --- /dev/null +++ b/railgun/internal/proxy/clientgo_cached_proxy_resolver.go @@ -0,0 +1,227 @@ +package proxy + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kong/kubernetes-ingress-controller/pkg/deckgen" + "github.com/kong/kubernetes-ingress-controller/pkg/parser" + "github.com/kong/kubernetes-ingress-controller/pkg/sendconfig" + "github.com/kong/kubernetes-ingress-controller/pkg/store" +) + +// ----------------------------------------------------------------------------- +// Client Go Cached Proxy Resolver - Public Functions +// ----------------------------------------------------------------------------- + +// NewCacheBasedProxy will provide a new Proxy object. Note that this starts some background services +// and the caller is thereafter responsible for closing the Proxy.StopCh. +func NewCacheBasedProxy(ctx context.Context, logger logr.Logger, k8s client.Client, kongConfig sendconfig.Kong, ingressClassName string, processClasslessIngressV1Beta1 bool, processClasslessIngressV1 bool, processClasslessKongConsumer bool) Proxy { + return NewCacheBasedProxyWithStagger(ctx, logger, k8s, kongConfig, ingressClassName, processClasslessIngressV1Beta1, processClasslessIngressV1, processClasslessKongConsumer, DefaultStagger) +} + +func NewCacheBasedProxyWithStagger(ctx context.Context, logger logr.Logger, k8s client.Client, kongConfig sendconfig.Kong, ingressClassName string, processClasslessIngressV1Beta1 bool, processClasslessIngressV1 bool, processClasslessKongConsumer bool, stagger time.Duration) Proxy { + cache := store.NewCacheStores() + proxy := &clientgoCachedProxyResolver{ + kongConfig: kongConfig, + cache: &cache, + logger: logr.Discard(), + + ingressClassName: ingressClassName, + processClasslessIngressV1Beta1: processClasslessIngressV1Beta1, + processClasslessIngressV1: processClasslessIngressV1, + processClasslessKongConsumer: processClasslessKongConsumer, + stopCh: make(chan struct{}), + + ctx: ctx, + update: make(chan *cachedObject, DefaultObjectBufferSize), + del: make(chan *cachedObject, DefaultObjectBufferSize), + stagger: stagger, + } + go proxy.startCacheServer() + return proxy +} + +// ----------------------------------------------------------------------------- +// Client Go Cached Proxy Resolver - Private Types +// ----------------------------------------------------------------------------- + +// clientgoCachedProxyResolver represents the cached objects and Kong DSL configuration. +// +// This implements the Proxy interface to provide asynchronous, non-blocking updates to +// the Kong Admin API for controller-runtime based controller managers. +// +// This object's attributes are immutable (private), and it is threadsafe. +type clientgoCachedProxyResolver struct { + // kubernetes configuration + k8s client.Client + cache *store.CacheStores + + // kong configuration + kongConfig sendconfig.Kong + + // cache store configuration options + ingressClassName string + processClasslessIngressV1Beta1 bool + processClasslessIngressV1 bool + processClasslessKongConsumer bool + + // cache server flow control, channels and utility attributes + ctx context.Context + stagger time.Duration + logger logr.Logger + stopCh chan struct{} + + // channels + update chan *cachedObject + del chan *cachedObject +} + +// cacheAction indicates what caching action (update, delete) was taken for any particular runtime.Object. +type cacheAction string + +var ( + // updated indicates that this object was either newly added OR updated in the cache (no distinction made) + updated cacheAction = "updated" + + // deleted indicates that this object was removed from the cache + deleted cacheAction = "deleted" +) + +// cachedObject represents an object that has been processed by the cacheServer +type cachedObject struct { + action cacheAction + err error + + key string + runtimeObj runtime.Object +} + +// objectTracker is a secondary cache used to track objects that have been updated/deleted between successful updates of the Kong Admin API. +type objectTracker map[string]*cachedObject + +// ----------------------------------------------------------------------------- +// Client Go Cached Proxy Resolver - Public Methods - Interface Implementation +// ----------------------------------------------------------------------------- + +func (p *clientgoCachedProxyResolver) UpdateObject(obj client.Object) error { + cobj := &cachedObject{action: updated, key: p.clientObjectKey(obj), runtimeObj: obj.DeepCopyObject()} + select { + case p.update <- cobj: + return nil + default: + return fmt.Errorf("the proxy is too busy to accept requests at this time, try again later") + } +} + +func (p *clientgoCachedProxyResolver) DeleteObject(obj client.Object) error { + cobj := &cachedObject{action: deleted, key: p.clientObjectKey(obj), runtimeObj: obj.DeepCopyObject()} + select { + case p.del <- cobj: + return nil + default: + return fmt.Errorf("the proxy is too busy to accept requests at this time, try again later") + } +} + +// ----------------------------------------------------------------------------- +// Client Go Cached Proxy Resolver - Private Methods - Cache Server +// ----------------------------------------------------------------------------- + +// startCacheServer runs a server in a background goroutine that is responsible for: +// +// 1. processing kubernetes object updates (add, replace) +// 2. processing kubernetes object deletes +// 3. regularly synchronizing configuration to the Kong Admin API (staggered) +// +// While processing objects the cacheServer will (synchronously) convert the objects to kong DSL and +// submit POST updates to the Kong Admin API with the new configuration. +func (p *clientgoCachedProxyResolver) startCacheServer() { + p.logger.Info("the proxy cache server has been started") + + // syncTimer is a regular interval to check for cache updates and resolve the cache to the Kong Admin API + syncTicker := time.NewTicker(p.stagger) + + // updates tracks whether any updates/deletes were tracked this cycle + updates := false + for { + select { + case cobj := <-p.update: + if err := p.cacheUpdate(cobj); err != nil { + p.logger.Error(err, "object could not be updated in the cache and will be discarded") + break + } + updates = true + case cobj := <-p.del: + if err := p.cacheDelete(cobj); err != nil { + p.logger.Error(err, "object could not be deleted from the cache and will be discarded") + break + } + updates = true + case <-syncTicker.C: + // if there are no relevant object updates for this cycle, then there's no reason to + // bother the Kong Proxy with updates from the cache. + if !updates { + break + } + if err := p.updateKongAdmin(); err != nil { + p.logger.Error(err, "could not update kong admin") + } + updates = false + case <-p.ctx.Done(): + p.logger.Info("the proxy cache server's context is done, shutting down") + return + } + } +} + +// ----------------------------------------------------------------------------- +// Client Go Cached Proxy Resolver - Private Methods - Cache Server Utils +// ----------------------------------------------------------------------------- + +// objectKey provides a key unique to the cacheServer for tracking objects that are being resolved. +func (p *clientgoCachedProxyResolver) clientObjectKey(obj client.Object) string { + return fmt.Sprintf("%s/%s/%s", obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName()) +} + +// cacheUpdate caches the provided object to the proxy cache and the provided object tracker and reports any errors. +func (p *clientgoCachedProxyResolver) cacheUpdate(cobj *cachedObject) error { + cobj.err = p.cache.Add(cobj.runtimeObj) + return cobj.err +} + +// cacheDelete removes the cache entry the provided object from the proxy cache and the provided object tracker and reports any errors. +func (p *clientgoCachedProxyResolver) cacheDelete(cobj *cachedObject) error { + cobj.err = p.cache.Delete(cobj.runtimeObj) + return cobj.err +} + +// updateKongAdmin will take whatever the current state of the Proxy.cache is an convert that to Kong DSL +// and apply the resulting configuration to the Kong Admin API. +func (p *clientgoCachedProxyResolver) updateKongAdmin() error { + // build the kongstate object from the Kubernetes objects in the storer + storer := store.New(*p.cache, p.ingressClassName, p.processClasslessIngressV1, p.processClasslessIngressV1Beta1, p.processClasslessKongConsumer, logrus.StandardLogger()) + kongstate, err := parser.Build(logrus.StandardLogger(), storer) + if err != nil { + return err + } + + // generate the deck configuration to be applied to the admin API + targetConfig := deckgen.ToDeckContent(p.ctx, logrus.StandardLogger(), kongstate, nil, nil) + + // apply the configuration update in Kong + timedCtx, cancel := context.WithTimeout(p.ctx, 10*time.Second) + defer cancel() + _, err = sendconfig.PerformUpdate(timedCtx, logrus.StandardLogger(), &p.kongConfig, true, false, targetConfig, nil, nil, nil) + if err != nil { + return err + } + + return nil +} diff --git a/railgun/internal/proxy/proxy.go b/railgun/internal/proxy/proxy.go new file mode 100644 index 0000000000..f6c95e5ccc --- /dev/null +++ b/railgun/internal/proxy/proxy.go @@ -0,0 +1,56 @@ +package proxy + +import ( + "time" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// ----------------------------------------------------------------------------- +// Proxy - Public Vars +// ----------------------------------------------------------------------------- + +const ( + // DefaultStagger indicates the time.Duration (minimum) that will occur between + // updates to the Kong Proxy Admin API when using the NewProxy() constructor. + // Use the NewProxyWithStagger() constructor to provide your own duration. + DefaultStagger time.Duration = time.Second * 3 + + // DefaultObjectBufferSize is the number of client.Objects that the server will buffer + // before it starts rejecting new objects while it processes the originals. + // If you get to the point that objects are rejected, you'll find that the + // UpdateObject() and DeleteObject() methods will start throwing errors and you'll + // need to retry queing the object at a later time. + // + // NOTE: implementations of the Proxy interface should error, not block on full buffer. + // + // TODO: the current default of 50 is based on a loose approximation to allow ~5mb + // of buffer space for client.Objects and assuming a throughput of ~50 API + // updates per second, but in the future we may want to make this configurable, + // provide metrics for it, and furthermore automate detecting good values for it. + // depending on configuration and/or available system memory and the amount of + // throughput (in Kubernetes object updates) that the API is meant to handle. + DefaultObjectBufferSize = 500 +) + +// ----------------------------------------------------------------------------- +// Proxy - Public Types +// ----------------------------------------------------------------------------- + +// Proxy represents the Kong Proxy from the perspective of Kubernetes allowing +// callers to update and remove Kubernetes objects in the backend proxy without +// having to understand or be aware of Kong DSLs or how types are converted between +// Kubernetes and the Kong Admin API. +// +// NOTE: implementations of this interface are: threadsafe, non-blocking +type Proxy interface { + // UpdateObject accepts a Kubernetes controller-runtime client.Object and adds/updates that to the configuration cache. + // It will be asynchronously converted into the upstream Kong DSL and applied to the Kong Admin API. + // A status will later be added to the object whether the configuration update succeeds or fails. + UpdateObject(obj client.Object) error + + // DeleteObject accepts a Kubernetes controller-runtime client.Object and removes it from the configuration cache. + // The delete action will asynchronously be converted to Kong DSL and applied to the Kong Admin API. + // A status will later be added to the object whether the configuration update succeeds or fails. + DeleteObject(obj client.Object) error +} diff --git a/railgun/manager/manager.go b/railgun/manager/manager.go index af26c5bbea..cfe30db146 100644 --- a/railgun/manager/manager.go +++ b/railgun/manager/manager.go @@ -28,9 +28,9 @@ import ( configurationv1beta1 "github.com/kong/kubernetes-ingress-controller/railgun/apis/configuration/v1beta1" "github.com/kong/kubernetes-ingress-controller/railgun/controllers/configuration" kongctrl "github.com/kong/kubernetes-ingress-controller/railgun/controllers/configuration" - "github.com/kong/kubernetes-ingress-controller/railgun/controllers/corev1" "github.com/kong/kubernetes-ingress-controller/railgun/internal/ctrlutils" "github.com/kong/kubernetes-ingress-controller/railgun/internal/mgrutils" + "github.com/kong/kubernetes-ingress-controller/railgun/internal/proxy" ) var ( @@ -243,42 +243,32 @@ func Run(ctx context.Context, c *Config) error { return err } - kongCFG := sendconfig.Kong{ + kongConfig := sendconfig.Kong{ URL: c.KongURL, FilterTags: []string{c.FilterTag}, Concurrency: c.Concurrency, Client: kongClient, } + prx := proxy.NewCacheBasedProxy(ctx, setupLog.WithName("proxy-cache-resolver"), mgr.GetClient(), kongConfig, c.IngressClassName, c.ProcessClasslessIngressV1Beta1, c.ProcessClasslessIngressV1, c.ProcessClasslessKongConsumer) + controllers := []ControllerDef{ { IsEnabled: &c.ServiceEnabled, - Controller: &corev1.CoreV1ServiceReconciler{ + Controller: &configuration.CoreV1ServiceReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("Service"), Scheme: mgr.GetScheme(), - ProxyUpdateParams: ctrlutils.ProxyUpdateParams{ - IngressClassName: c.IngressClassName, - KongConfig: kongCFG, - ProcessClasslessIngressV1: c.ProcessClasslessIngressV1, - ProcessClasslessIngressV1Beta1: c.ProcessClasslessIngressV1Beta1, - ProcessClasslessKongConsumer: c.ProcessClasslessKongConsumer, - }, + Proxy: prx, }, }, { IsEnabled: &c.ServiceEnabled, - Controller: &corev1.CoreV1EndpointsReconciler{ + Controller: &configuration.CoreV1EndpointsReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("Endpoints"), Scheme: mgr.GetScheme(), - ProxyUpdateParams: ctrlutils.ProxyUpdateParams{ - IngressClassName: c.IngressClassName, - KongConfig: kongCFG, - ProcessClasslessIngressV1: c.ProcessClasslessIngressV1, - ProcessClasslessIngressV1Beta1: c.ProcessClasslessIngressV1Beta1, - ProcessClasslessKongConsumer: c.ProcessClasslessKongConsumer, - }, + Proxy: prx, }, }, @@ -288,13 +278,7 @@ func Run(ctx context.Context, c *Config) error { Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("Ingress"), Scheme: mgr.GetScheme(), - ProxyUpdateParams: ctrlutils.ProxyUpdateParams{ - IngressClassName: c.IngressClassName, - KongConfig: kongCFG, - ProcessClasslessIngressV1: c.ProcessClasslessIngressV1, - ProcessClasslessIngressV1Beta1: c.ProcessClasslessIngressV1Beta1, - ProcessClasslessKongConsumer: c.ProcessClasslessKongConsumer, - }, + Proxy: prx, }, }, { @@ -303,13 +287,7 @@ func Run(ctx context.Context, c *Config) error { Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("Ingress"), Scheme: mgr.GetScheme(), - ProxyUpdateParams: ctrlutils.ProxyUpdateParams{ - IngressClassName: c.IngressClassName, - KongConfig: kongCFG, - ProcessClasslessIngressV1: c.ProcessClasslessIngressV1, - ProcessClasslessIngressV1Beta1: c.ProcessClasslessIngressV1Beta1, - ProcessClasslessKongConsumer: c.ProcessClasslessKongConsumer, - }, + Proxy: prx, }, }, { @@ -318,13 +296,7 @@ func Run(ctx context.Context, c *Config) error { Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("Ingress"), Scheme: mgr.GetScheme(), - ProxyUpdateParams: ctrlutils.ProxyUpdateParams{ - IngressClassName: c.IngressClassName, - KongConfig: kongCFG, - ProcessClasslessIngressV1: c.ProcessClasslessIngressV1, - ProcessClasslessIngressV1Beta1: c.ProcessClasslessIngressV1Beta1, - ProcessClasslessKongConsumer: c.ProcessClasslessKongConsumer, - }, + Proxy: prx, }, }, { @@ -333,13 +305,7 @@ func Run(ctx context.Context, c *Config) error { Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("UDPIngress"), Scheme: mgr.GetScheme(), - ProxyUpdateParams: ctrlutils.ProxyUpdateParams{ - IngressClassName: c.IngressClassName, - KongConfig: kongCFG, - ProcessClasslessIngressV1: c.ProcessClasslessIngressV1, - ProcessClasslessIngressV1Beta1: c.ProcessClasslessIngressV1Beta1, - ProcessClasslessKongConsumer: c.ProcessClasslessKongConsumer, - }, + Proxy: prx, }, }, { @@ -348,13 +314,7 @@ func Run(ctx context.Context, c *Config) error { Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("TCPIngress"), Scheme: mgr.GetScheme(), - ProxyUpdateParams: ctrlutils.ProxyUpdateParams{ - IngressClassName: c.IngressClassName, - KongConfig: kongCFG, - ProcessClasslessIngressV1: c.ProcessClasslessIngressV1, - ProcessClasslessIngressV1Beta1: c.ProcessClasslessIngressV1Beta1, - ProcessClasslessKongConsumer: c.ProcessClasslessKongConsumer, - }, + Proxy: prx, }, }, { @@ -363,13 +323,7 @@ func Run(ctx context.Context, c *Config) error { Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("KongIngress"), Scheme: mgr.GetScheme(), - ProxyUpdateParams: ctrlutils.ProxyUpdateParams{ - IngressClassName: c.IngressClassName, - KongConfig: kongCFG, - ProcessClasslessIngressV1: c.ProcessClasslessIngressV1, - ProcessClasslessIngressV1Beta1: c.ProcessClasslessIngressV1Beta1, - ProcessClasslessKongConsumer: c.ProcessClasslessKongConsumer, - }, + Proxy: prx, }, }, { @@ -378,13 +332,7 @@ func Run(ctx context.Context, c *Config) error { Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("KongClusterPlugin"), Scheme: mgr.GetScheme(), - ProxyUpdateParams: ctrlutils.ProxyUpdateParams{ - IngressClassName: c.IngressClassName, - KongConfig: kongCFG, - ProcessClasslessIngressV1: c.ProcessClasslessIngressV1, - ProcessClasslessIngressV1Beta1: c.ProcessClasslessIngressV1Beta1, - ProcessClasslessKongConsumer: c.ProcessClasslessKongConsumer, - }, + Proxy: prx, }, }, { @@ -393,13 +341,7 @@ func Run(ctx context.Context, c *Config) error { Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("KongPlugin"), Scheme: mgr.GetScheme(), - ProxyUpdateParams: ctrlutils.ProxyUpdateParams{ - IngressClassName: c.IngressClassName, - KongConfig: kongCFG, - ProcessClasslessIngressV1: c.ProcessClasslessIngressV1, - ProcessClasslessIngressV1Beta1: c.ProcessClasslessIngressV1Beta1, - ProcessClasslessKongConsumer: c.ProcessClasslessKongConsumer, - }, + Proxy: prx, }, }, { @@ -408,13 +350,7 @@ func Run(ctx context.Context, c *Config) error { Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("KongConsumer"), Scheme: mgr.GetScheme(), - ProxyUpdateParams: ctrlutils.ProxyUpdateParams{ - IngressClassName: c.IngressClassName, - KongConfig: kongCFG, - ProcessClasslessIngressV1: c.ProcessClasslessIngressV1, - ProcessClasslessIngressV1Beta1: c.ProcessClasslessIngressV1Beta1, - ProcessClasslessKongConsumer: c.ProcessClasslessKongConsumer, - }, + Proxy: prx, }, }, } @@ -438,7 +374,7 @@ func Run(ctx context.Context, c *Config) error { if c.AnonymousReports { setupLog.Info("running anonymous reports") - if err := mgrutils.RunReport(ctx, kubeconfig, kongCFG, Release); err != nil { + if err := mgrutils.RunReport(ctx, kubeconfig, kongConfig, Release); err != nil { setupLog.Error(err, "anonymous reporting failed") } } else {