Skip to content

Commit

Permalink
refactor: Use Shared Rate Limiter Code (#2145)
Browse files Browse the repository at this point in the history
* refactor: Use Shared Rate Limiter Code

* fix: Introduce Check for IntendedRequeueErrors

* fix: Remove Global Variable

* chore: Remove Intended Requeue Error Check
  • Loading branch information
LeelaChacha authored Dec 30, 2024
1 parent 1fa774d commit 6d903a3
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 23 deletions.
40 changes: 21 additions & 19 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ import (
"github.com/go-co-op/gocron"
"github.com/go-logr/logr"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
istioclientapiv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
machineryruntime "k8s.io/apimachinery/pkg/runtime"
machineryutilruntime "k8s.io/apimachinery/pkg/util/runtime"
k8sclientscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
ctrlruntime "sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -168,7 +166,7 @@ func setupManager(flagVar *flags.FlagVar, cacheOptions cache.Options, scheme *ma
eventRecorder := event.NewRecorderWrapper(mgr.GetEventRecorderFor(shared.OperatorName))
skrContextProvider := remote.NewKymaSkrContextProvider(kcpClient, remoteClientCache, eventRecorder)
var skrWebhookManager *watcher.SKRWebhookManifestManager
options := controllerOptionsFromFlagVar(flagVar)
var options ctrlruntime.Options
if flagVar.EnableKcpWatcher {
if skrWebhookManager, err = createSkrWebhookManager(mgr, skrContextProvider, flagVar); err != nil {
setupLog.Error(err, "failed to create skr webhook manager")
Expand Down Expand Up @@ -266,26 +264,16 @@ func scheduleMetricsCleanup(kymaMetrics *metrics.KymaMetrics, cleanupIntervalInM
setupLog.V(log.DebugLevel).Info("scheduled job for cleaning up metrics")
}

func controllerOptionsFromFlagVar(flagVar *flags.FlagVar) ctrlruntime.Options {
return ctrlruntime.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[ctrl.Request](flagVar.FailureBaseDelay,
flagVar.FailureMaxDelay),
&workqueue.TypedBucketRateLimiter[ctrl.Request]{
Limiter: rate.NewLimiter(rate.Limit(flagVar.RateLimiterFrequency), flagVar.RateLimiterBurst),
},
),

CacheSyncTimeout: flagVar.CacheSyncTimeout,
}
}

func setupKymaReconciler(mgr ctrl.Manager, descriptorProvider *provider.CachedDescriptorProvider,
skrContextFactory remote.SkrContextProvider, event event.Event, flagVar *flags.FlagVar, options ctrlruntime.Options,
skrWebhookManager *watcher.SKRWebhookManifestManager, kymaMetrics *metrics.KymaMetrics,
moduleMetrics *metrics.ModuleMetrics, setupLog logr.Logger,
) {
options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay,
flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst)
options.CacheSyncTimeout = flagVar.CacheSyncTimeout
options.MaxConcurrentReconciles = flagVar.MaxConcurrentKymaReconciles

if err := (&kyma.Reconciler{
Client: mgr.GetClient(),
SkrContextFactory: skrContextFactory,
Expand Down Expand Up @@ -361,6 +349,10 @@ func setupPurgeReconciler(mgr ctrl.Manager,
options ctrlruntime.Options,
setupLog logr.Logger,
) {
options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay,
flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst)
options.CacheSyncTimeout = flagVar.CacheSyncTimeout

if err := (&purge.Reconciler{
Client: mgr.GetClient(),
SkrContextFactory: skrContextProvider,
Expand All @@ -381,9 +373,10 @@ func setupManifestReconciler(mgr ctrl.Manager, flagVar *flags.FlagVar, options c
sharedMetrics *metrics.SharedMetrics, mandatoryModulesMetrics *metrics.MandatoryModulesMetrics,
moduleMetrics *metrics.ModuleMetrics, setupLog logr.Logger, event event.Event,
) {
options.MaxConcurrentReconciles = flagVar.MaxConcurrentManifestReconciles
options.RateLimiter = internal.ManifestRateLimiter(flagVar.FailureBaseDelay,
options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay,
flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst)
options.CacheSyncTimeout = flagVar.CacheSyncTimeout
options.MaxConcurrentReconciles = flagVar.MaxConcurrentManifestReconciles

manifestClient := manifestclient.NewManifestClient(event, mgr.GetClient())

Expand All @@ -409,6 +402,9 @@ func setupManifestReconciler(mgr ctrl.Manager, flagVar *flags.FlagVar, options c
func setupKcpWatcherReconciler(mgr ctrl.Manager, options ctrlruntime.Options, event event.Event, flagVar *flags.FlagVar,
setupLog logr.Logger,
) {
options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay,
flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst)
options.CacheSyncTimeout = flagVar.CacheSyncTimeout
options.MaxConcurrentReconciles = flagVar.MaxConcurrentWatcherReconciles

if err := (&watcherctrl.Reconciler{
Expand Down Expand Up @@ -436,6 +432,9 @@ func setupMandatoryModuleReconciler(mgr ctrl.Manager,
metrics *metrics.MandatoryModulesMetrics,
setupLog logr.Logger,
) {
options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay,
flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst)
options.CacheSyncTimeout = flagVar.CacheSyncTimeout
options.MaxConcurrentReconciles = flagVar.MaxConcurrentMandatoryModuleReconciles

if err := (&mandatorymodule.InstallationReconciler{
Expand Down Expand Up @@ -463,6 +462,9 @@ func setupMandatoryModuleDeletionReconciler(mgr ctrl.Manager,
options ctrlruntime.Options,
setupLog logr.Logger,
) {
options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay,
flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst)
options.CacheSyncTimeout = flagVar.CacheSyncTimeout
options.MaxConcurrentReconciles = flagVar.MaxConcurrentMandatoryModuleDeletionReconciles

if err := (&mandatorymodule.DeletionReconciler{
Expand Down
2 changes: 1 addition & 1 deletion internal/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
)

func ManifestRateLimiter(
func RateLimiter(
failureBaseDelay time.Duration, failureMaxDelay time.Duration,
frequency int, burst int,
) workqueue.TypedRateLimiter[ctrl.Request] {
Expand Down
7 changes: 6 additions & 1 deletion tests/integration/controller/kyma/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ var _ = BeforeSuite(func() {
RemoteSyncNamespace: flags.DefaultRemoteSyncNamespace,
Metrics: metrics.NewKymaMetrics(metrics.NewSharedMetrics()),
ModuleMetrics: metrics.NewModuleMetrics(),
}).SetupWithManager(mgr, ctrlruntime.Options{},
}).SetupWithManager(mgr, ctrlruntime.Options{
RateLimiter: internal.RateLimiter(
1*time.Second, 5*time.Second,
30, 200,
),
},
kyma.SetupOptions{ListenerAddr: randomPort})
Expect(err).ToNot(HaveOccurred())
Eventually(CreateNamespace, Timeout, Interval).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ var _ = BeforeSuite(func() {
Watches(&apicorev1.Secret{}, handler.Funcs{}).
WithOptions(
ctrlruntime.Options{
RateLimiter: internal.ManifestRateLimiter(
RateLimiter: internal.RateLimiter(
1*time.Second, 5*time.Second,
30, 200,
),
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/controller/manifest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ var _ = BeforeSuite(func() {
Watches(&apicorev1.Secret{}, handler.Funcs{}).
WithOptions(
ctrlruntime.Options{
RateLimiter: internal.ManifestRateLimiter(
RateLimiter: internal.RateLimiter(
1*time.Second, 5*time.Second,
30, 200,
),
Expand Down

0 comments on commit 6d903a3

Please sign in to comment.