Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix configsync for static -> dynamic -> static #54

Merged
merged 2 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 110 additions & 13 deletions cmd/boskos/boskos.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"flag"
"fmt"
"net/http"
Expand All @@ -28,6 +29,13 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"k8s.io/test-infra/pkg/flagutil"
"k8s.io/test-infra/prow/config"
Expand All @@ -36,6 +44,7 @@ import (
"k8s.io/test-infra/prow/logrusutil"
prowmetrics "k8s.io/test-infra/prow/metrics"
"k8s.io/test-infra/prow/pjutil"
"sigs.k8s.io/boskos/common"
"sigs.k8s.io/boskos/crds"
"sigs.k8s.io/boskos/handlers"
"sigs.k8s.io/boskos/metrics"
Expand All @@ -49,9 +58,9 @@ const (
)

var (
configPath = flag.String("config", "config.yaml", "Path to init resource file")
dynamicResourceUpdatePeriod = flag.Duration("dynamic-resource-update-period", defaultDynamicResourceUpdatePeriod,
"Period at which to update dynamic resources. Set to 0 to disable.")
configPath = flag.String("config", "config.yaml", "Path to init resource file")
_ = flag.Duration("dynamic-resource-update-period", defaultDynamicResourceUpdatePeriod,
"Legacy flag that does nothing but is kept for compatibility reasons")
requestTTL = flag.Duration("request-ttl", defaultRequestTTL, "request TTL before losing priority in the queue")
logLevel = flag.String("log-level", "info", fmt.Sprintf("Log level is one of %v.", logrus.AllLevels))
namespace = flag.String("namespace", corev1.NamespaceDefault, "namespace to install on")
Expand Down Expand Up @@ -99,12 +108,28 @@ func main() {
// main server with the main mux until we're ready
health := pjutil.NewHealth()

client, err := kubeClientOptions.CacheBackedClient(*namespace, &crds.ResourceObject{}, &crds.DRLCObject{})
cfg, err := kubeClientOptions.Cfg()
if err != nil {
logrus.WithError(err).Fatal("unable to get client")
logrus.WithError(err).Fatal("Failed to get kubeconfig")
}
cfg.QPS = 100
cfg.Burst = 200
mgr, err := manager.New(cfg, manager.Options{
LeaderElection: false,
Namespace: *namespace,
MetricsBindAddress: "0",
})
if err != nil {
logrus.WithError(err).Fatal("Failed to construct mgr.")
}
interrupts.Run(func(ctx context.Context) {
if err := mgr.Start(ctx.Done()); err != nil {
logrus.WithError(err).Fatal("Mgr failed.")
}
logrus.Info("Mgr finished gracefully.")
})

storage := ranch.NewStorage(interrupts.Context(), client, *namespace)
storage := ranch.NewStorage(interrupts.Context(), mgr.GetClient(), *namespace)

r, err := ranch.NewRanch(*configPath, storage, *requestTTL)
if err != nil {
Expand All @@ -119,23 +144,26 @@ func main() {
// Viper defaults the configfile name to `config` and `SetConfigFile` only
// has an effect when the configfile name is not an empty string, so we
// just disable it entirely if there is no config.
configChangeEventChan := make(chan event.GenericEvent)
if *configPath != "" {
v := viper.New()
v.SetConfigFile(*configPath)
v.SetConfigType("yaml")
v.WatchConfig()
v.OnConfigChange(func(in fsnotify.Event) {
logrus.Infof("Updating Boskos Config")
if err := r.SyncConfig(*configPath); err != nil {
logrus.WithError(err).Errorf("Failed to update config")
} else {
logrus.Infof("Updated Boskos Config successfully")
}
logrus.Info("Boskos config file changed, updating config.")
configChangeEventChan <- event.GenericEvent{}
})
}

syncConfig := func() error {
return r.SyncConfig(*configPath)
}
if err := addConfigSyncReconcilerToManager(mgr, syncConfig, configChangeEventChan); err != nil {
logrus.WithError(err).Fatal("Failed to set up config sync controller")
}

prometheus.MustRegister(metrics.NewResourcesCollector(r))
r.StartDynamicResourceUpdater(*dynamicResourceUpdatePeriod)
r.StartRequestGC(defaultRequestGCPeriod)

logrus.Info("Start Service")
Expand All @@ -144,3 +172,72 @@ func main() {
// signal to the world that we're ready
health.ServeReady()
}

type configSyncReconciler struct {
ixdy marked this conversation as resolved.
Show resolved Hide resolved
sync func() error
}

func (r *configSyncReconciler) Reconcile(_ reconcile.Request) (reconcile.Result, error) {
err := r.sync()
if err != nil {
logrus.WithError(err).Error("Config sync failed")
}
return reconcile.Result{}, err
}

func addConfigSyncReconcilerToManager(mgr manager.Manager, configSync func() error, configChangeEvent <-chan event.GenericEvent) error {
ctrl, err := controller.New("bokos_config_reconciler", mgr, controller.Options{
// We reconcile the whole config, hence this is not safe to run concurrently
MaxConcurrentReconciles: 1,
Reconciler: &configSyncReconciler{
sync: configSync,
},
})
if err != nil {
return fmt.Errorf("failed to construct controller: %w", err)
}

if err := ctrl.Watch(&source.Kind{Type: &crds.ResourceObject{}}, constHandler(), resourceUpdatePredicate()); err != nil {
return fmt.Errorf("failed to watch boskos resources: %w", err)
}
if err := ctrl.Watch(&source.Kind{Type: &crds.DRLCObject{}}, constHandler()); err != nil {
return fmt.Errorf("failed to watch boskos dynamic resources: %w", err)
}
if err := ctrl.Watch(&source.Channel{Source: configChangeEvent}, constHandler()); err != nil {
return fmt.Errorf("failed to create source channel for config change event: %w", err)
}
if err := mgr.Add(ctrl); err != nil {
return fmt.Errorf("failed to add controller to manager: %w", err)
}

return nil
}

func constHandler() handler.EventHandler {
return &handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(func(handler.MapObject) []reconcile.Request {
return []reconcile.Request{{}}
}),
}
}

// resourceUpdatePredicate prevents the config reconciler from reacting to resource update events
alvaroaleman marked this conversation as resolved.
Show resolved Hide resolved
// except if:
// * The new status is tombstone, because then we have to delete is
// * The new owner is empty, because then we have to delete it if it got deleted from the config but
// was not deleted from the api to let the current owner finish its work.
func resourceUpdatePredicate() predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(_ event.CreateEvent) bool { return true },
DeleteFunc: func(_ event.DeleteEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
resource, ok := e.ObjectNew.(*crds.ResourceObject)
if !ok {
panic(fmt.Sprintf("BUG: expected *crds.ResourceObject, got %T", e.ObjectNew))
}

return resource.Status.State == common.Tombstone || resource.Status.Owner == ""
},
GenericFunc: func(_ event.GenericEvent) bool { return true },
}
}
25 changes: 0 additions & 25 deletions ranch/ranch.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,6 @@ func NewRanch(config string, s *Storage, ttl time.Duration) (*Ranch, error) {
requestMgr: NewRequestManager(ttl),
now: time.Now,
}
if config != "" {
if err := newRanch.SyncConfig(config); err != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This races with the cache startup and will fail if the cache wasn't started in time

return nil, err
}
logrus.Infof("Loaded Boskos configuration successfully")
}
return newRanch, nil
}

Expand Down Expand Up @@ -410,25 +404,6 @@ func (r *Ranch) SyncConfig(configPath string) error {
return r.Storage.SyncResources(config)
}

// StartDynamicResourceUpdater starts a goroutine which periodically
// updates all dynamic resources.
func (r *Ranch) StartDynamicResourceUpdater(updatePeriod time.Duration) {
ixdy marked this conversation as resolved.
Show resolved Hide resolved
if updatePeriod == 0 {
return
}
go func() {
updateTick := time.NewTicker(updatePeriod).C
for {
select {
case <-updateTick:
if err := r.Storage.UpdateAllDynamicResources(); err != nil {
logrus.WithError(err).Error("UpdateAllDynamicResources failed")
}
}
}
}()
}

// StartRequestGC starts the GC of expired requests
func (r *Ranch) StartRequestGC(gcPeriod time.Duration) {
r.requestMgr.StartGC(gcPeriod)
Expand Down
Loading