Skip to content

Commit

Permalink
Add flags to configure exponential back-off retry
Browse files Browse the repository at this point in the history
Add two new flags to enable users to configure exponential
back-off for Flux objects. The default values are now
set to 750ms for minimum retry time, and 15min for max.

Signed-off-by: Paulo Gomes <paulo.gomes@weave.works>
  • Loading branch information
Paulo Gomes committed Apr 11, 2022
1 parent 711780c commit b02684b
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 4 deletions.
7 changes: 6 additions & 1 deletion controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
Expand Down Expand Up @@ -122,6 +123,7 @@ type BucketReconciler struct {

type BucketReconcilerOptions struct {
MaxConcurrentReconciles int
RateLimiter ratelimiter.RateLimiter
}

// BucketProvider is an interface for fetching objects from a storage provider
Expand Down Expand Up @@ -235,7 +237,10 @@ func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts Buc
return ctrl.NewControllerManagedBy(mgr).
For(&sourcev1.Bucket{}).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{})).
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
WithOptions(controller.Options{
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
RateLimiter: opts.RateLimiter,
}).
Complete(r)
}

Expand Down
7 changes: 6 additions & 1 deletion controllers/gitrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
Expand Down Expand Up @@ -118,6 +119,7 @@ type GitRepositoryReconciler struct {
type GitRepositoryReconcilerOptions struct {
MaxConcurrentReconciles int
DependencyRequeueInterval time.Duration
RateLimiter ratelimiter.RateLimiter
}

// gitRepositoryReconcileFunc is the function type for all the
Expand All @@ -135,7 +137,10 @@ func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, o
For(&sourcev1.GitRepository{}, builder.WithPredicates(
predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}),
)).
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
WithOptions(controller.Options{
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
RateLimiter: opts.RateLimiter,
}).
Complete(r)
}

Expand Down
7 changes: 6 additions & 1 deletion controllers/helmchart_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand Down Expand Up @@ -130,6 +131,7 @@ func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error {

type HelmChartReconcilerOptions struct {
MaxConcurrentReconciles int
RateLimiter ratelimiter.RateLimiter
}

// helmChartReconcileFunc is the function type for all the v1beta2.HelmChart
Expand Down Expand Up @@ -166,7 +168,10 @@ func (r *HelmChartReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts
handler.EnqueueRequestsFromMapFunc(r.requestsForBucketChange),
builder.WithPredicates(SourceRevisionChangePredicate{}),
).
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
WithOptions(controller.Options{
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
RateLimiter: opts.RateLimiter,
}).
Complete(r)
}

Expand Down
7 changes: 6 additions & 1 deletion controllers/helmrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
Expand Down Expand Up @@ -106,6 +107,7 @@ type HelmRepositoryReconciler struct {

type HelmRepositoryReconcilerOptions struct {
MaxConcurrentReconciles int
RateLimiter ratelimiter.RateLimiter
}

// helmRepositoryReconcileFunc is the function type for all the
Expand All @@ -122,7 +124,10 @@ func (r *HelmRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager,
return ctrl.NewControllerManagedBy(mgr).
For(&sourcev1.HelmRepository{}).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{})).
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
WithOptions(controller.Options{
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
RateLimiter: opts.RateLimiter,
}).
Complete(r)
}

Expand Down
13 changes: 13 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/fluxcd/pkg/runtime/client"
Expand Down Expand Up @@ -94,6 +95,8 @@ func main() {
kexAlgos []string
artifactRetentionTTL time.Duration
artifactRetentionRecords int
minRetryDelay time.Duration
maxRetryDelay time.Duration
)

flag.StringVar(&metricsAddr, "metrics-addr", envOrDefault("METRICS_ADDR", ":8080"),
Expand Down Expand Up @@ -130,6 +133,10 @@ func main() {
"The duration of time that artifacts will be kept in storage before being garbage collected.")
flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2,
"The maximum number of artifacts to be kept in storage after a garbage collection.")
flag.DurationVar(&minRetryDelay, "min-retry-delay", 750*time.Millisecond,
"The minimum amount of time in which a reconciliation object will wait before a retry.")
flag.DurationVar(&maxRetryDelay, "max-retry-delay", 15*time.Minute,
"The maximum amount of time in which a reconciliation object will wait before a retry.")

clientOptions.BindFlags(flag.CommandLine)
logOptions.BindFlags(flag.CommandLine)
Expand Down Expand Up @@ -186,6 +193,8 @@ func main() {
storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, setupLog)
setPreferredKexAlgos(kexAlgos)

rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay)

if err = (&controllers.GitRepositoryReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Expand All @@ -195,6 +204,7 @@ func main() {
}).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
DependencyRequeueInterval: requeueDependency,
RateLimiter: rateLimiter,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", sourcev1.GitRepositoryKind)
os.Exit(1)
Expand All @@ -208,6 +218,7 @@ func main() {
ControllerName: controllerName,
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
RateLimiter: rateLimiter,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind)
os.Exit(1)
Expand Down Expand Up @@ -241,6 +252,7 @@ func main() {
TTL: ttl,
}).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{
MaxConcurrentReconciles: concurrent,
RateLimiter: rateLimiter,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmChartKind)
os.Exit(1)
Expand All @@ -253,6 +265,7 @@ func main() {
ControllerName: controllerName,
}).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{
MaxConcurrentReconciles: concurrent,
RateLimiter: rateLimiter,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Bucket")
os.Exit(1)
Expand Down

0 comments on commit b02684b

Please sign in to comment.