diff --git a/cmd/main.go b/cmd/main.go index fc15487ef..e606344b1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,15 +18,10 @@ var ( Short: "Run Cluster Version Controller", Long: "", } - - rootOpts struct { - releaseImage string - } ) func init() { rootCmd.PersistentFlags().AddGoFlagSet(flag.CommandLine) - rootCmd.PersistentFlags().StringVar(&rootOpts.releaseImage, "release-image", "", "The Openshift release image url.") } func main() { diff --git a/cmd/render.go b/cmd/render.go index 4f994be9a..89ed5a9f0 100644 --- a/cmd/render.go +++ b/cmd/render.go @@ -18,13 +18,15 @@ var ( } renderOpts struct { - outputDir string + releaseImage string + outputDir string } ) func init() { rootCmd.AddCommand(renderCmd) renderCmd.PersistentFlags().StringVar(&renderOpts.outputDir, "output-dir", "", "The output directory where the manifests will be rendered.") + renderCmd.PersistentFlags().StringVar(&renderOpts.releaseImage, "release-image", "", "The Openshift release image url.") } func runRenderCmd(cmd *cobra.Command, args []string) { @@ -34,10 +36,10 @@ func runRenderCmd(cmd *cobra.Command, args []string) { if renderOpts.outputDir == "" { glog.Fatalf("missing --output-dir flag, it is required") } - if rootOpts.releaseImage == "" { + if renderOpts.releaseImage == "" { glog.Fatalf("missing --release-image flag, it is required") } - if err := cvo.Render(renderOpts.outputDir, rootOpts.releaseImage); err != nil { + if err := cvo.Render(renderOpts.outputDir, renderOpts.releaseImage); err != nil { glog.Fatalf("Render command failed: %v", err) } } diff --git a/cmd/start.go b/cmd/start.go index 15495a7d7..264e8b712 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -2,284 +2,36 @@ package main import ( "flag" - "fmt" - "math/rand" - "net/http" - "os" - "time" "github.com/golang/glog" - "github.com/google/uuid" - clientset "github.com/openshift/client-go/config/clientset/versioned" - informers "github.com/openshift/client-go/config/informers/externalversions" - "github.com/openshift/cluster-version-operator/pkg/autoupdate" - "github.com/openshift/cluster-version-operator/pkg/cvo" + "github.com/openshift/cluster-version-operator/pkg/start" "github.com/openshift/cluster-version-operator/pkg/version" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" - v1 "k8s.io/api/core/v1" - apiext "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" ) -const ( - minResyncPeriod = 2 * time.Minute - - leaseDuration = 90 * time.Second - renewDeadline = 45 * time.Second - retryPeriod = 30 * time.Second -) - -var ( - startCmd = &cobra.Command{ +func init() { + opts := start.NewOptions() + cmd := &cobra.Command{ Use: "start", Short: "Starts Cluster Version Operator", Long: "", - Run: runStartCmd, - } - - startOpts struct { - // name is provided for testing only to allow multiple CVO's to be running at once - name string - // namespace is provided for testing only - namespace string - - kubeconfig string - nodeName string - listenAddr string - - enableAutoUpdate bool - } -) - -func init() { - rootCmd.AddCommand(startCmd) - startCmd.PersistentFlags().StringVar(&startOpts.listenAddr, "listen", "0.0.0.0:9099", "Address to listen on for metrics") - startCmd.PersistentFlags().StringVar(&startOpts.kubeconfig, "kubeconfig", "", "Kubeconfig file to access a remote cluster (testing only)") - startCmd.PersistentFlags().StringVar(&startOpts.nodeName, "node-name", "", "kubernetes node name CVO is scheduled on.") - startCmd.PersistentFlags().BoolVar(&startOpts.enableAutoUpdate, "enable-auto-update", true, "Enables the autoupdate controller.") -} - -func runStartCmd(cmd *cobra.Command, args []string) { - flag.Set("logtostderr", "true") - flag.Parse() - - // To help debugging, immediately log version - glog.Infof("%s", version.String) - - if startOpts.nodeName == "" { - name, ok := os.LookupEnv("NODE_NAME") - if !ok || name == "" { - glog.Fatalf("node-name is required") - } - startOpts.nodeName = name - } - - // exposed for end-to-end testing only - startOpts.name = os.Getenv("CVO_NAME") - if len(startOpts.name) == 0 { - startOpts.name = componentName - } - startOpts.namespace = os.Getenv("CVO_NAMESPACE") - if len(startOpts.name) == 0 { - startOpts.namespace = componentNamespace - } + Run: func(cmd *cobra.Command, args []string) { + flag.Set("logtostderr", "true") + flag.Parse() - if rootOpts.releaseImage == "" { - glog.Fatalf("missing --release-image flag, it is required") - } + // To help debugging, immediately log version + glog.Infof("%s", version.String) - if len(startOpts.listenAddr) > 0 { - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) - go func() { - if err := http.ListenAndServe(startOpts.listenAddr, mux); err != nil { - glog.Fatalf("Unable to start metrics server: %v", err) + if err := opts.Run(); err != nil { + glog.Fatalf("error: %v", err) } - }() - } - - cb, err := newClientBuilder(startOpts.kubeconfig) - if err != nil { - glog.Fatalf("error creating clients: %v", err) - } - stopCh := make(chan struct{}) - run := func(stop <-chan struct{}) { - - ctx := createControllerContext(cb, startOpts.name, stopCh) - if err := startControllers(ctx); err != nil { - glog.Fatalf("error starting controllers: %v", err) - } - - ctx.CVInformerFactory.Start(ctx.Stop) - ctx.InformerFactory.Start(ctx.Stop) - close(ctx.InformersStarted) - - select {} - } - - leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ - Lock: createResourceLock(cb), - LeaseDuration: leaseDuration, - RenewDeadline: renewDeadline, - RetryPeriod: retryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: run, - OnStoppedLeading: func() { - glog.Fatalf("leaderelection lost") - }, - }, - }) - panic("unreachable") -} - -func createResourceLock(cb *clientBuilder) resourcelock.Interface { - recorder := record. - NewBroadcaster(). - NewRecorder(runtime.NewScheme(), v1.EventSource{Component: componentName}) - - id, err := os.Hostname() - if err != nil { - glog.Fatalf("error creating lock: %v", err) - } - - uuid, err := uuid.NewRandom() - if err != nil { - glog.Fatalf("Failed to generate UUID: %v", err) - } - - // add a uniquifier so that two processes on the same host don't accidentally both become active - id = id + "_" + uuid.String() - - return &resourcelock.ConfigMapLock{ - ConfigMapMeta: metav1.ObjectMeta{ - Namespace: componentNamespace, - Name: componentName, }, - Client: cb.KubeClientOrDie("leader-election").CoreV1(), - LockConfig: resourcelock.ResourceLockConfig{ - Identity: id, - EventRecorder: recorder, - }, - } -} - -func resyncPeriod() func() time.Duration { - return func() time.Duration { - factor := rand.Float64() + 1 - return time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor) - } -} - -type clientBuilder struct { - config *rest.Config -} - -func (cb *clientBuilder) RestConfig() *rest.Config { - c := rest.CopyConfig(cb.config) - return c -} - -func (cb *clientBuilder) ClientOrDie(name string) clientset.Interface { - return clientset.NewForConfigOrDie(rest.AddUserAgent(cb.config, name)) -} - -func (cb *clientBuilder) KubeClientOrDie(name string) kubernetes.Interface { - return kubernetes.NewForConfigOrDie(rest.AddUserAgent(cb.config, name)) -} - -func (cb *clientBuilder) APIExtClientOrDie(name string) apiext.Interface { - return apiext.NewForConfigOrDie(rest.AddUserAgent(cb.config, name)) -} - -func newClientBuilder(kubeconfig string) (*clientBuilder, error) { - var config *rest.Config - var err error - - if kubeconfig != "" { - glog.V(4).Infof("Loading kube client config from path %q", kubeconfig) - config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) - } else { - glog.V(4).Infof("Using in-cluster kube client config") - config, err = rest.InClusterConfig() - } - if err != nil { - return nil, err - } - - return &clientBuilder{ - config: config, - }, nil -} - -type controllerContext struct { - ClientBuilder *clientBuilder - - CVInformerFactory informers.SharedInformerFactory - InformerFactory informers.SharedInformerFactory - - Stop <-chan struct{} - - InformersStarted chan struct{} - - ResyncPeriod func() time.Duration -} - -func createControllerContext(cb *clientBuilder, name string, stop <-chan struct{}) *controllerContext { - client := cb.ClientOrDie("shared-informer") - - cvInformer := informers.NewFilteredSharedInformerFactory(client, resyncPeriod()(), "", func(opts *metav1.ListOptions) { - opts.FieldSelector = fmt.Sprintf("metadata.name=%s", name) - }) - sharedInformers := informers.NewSharedInformerFactory(client, resyncPeriod()()) - - return &controllerContext{ - ClientBuilder: cb, - CVInformerFactory: cvInformer, - InformerFactory: sharedInformers, - Stop: stop, - InformersStarted: make(chan struct{}), - ResyncPeriod: resyncPeriod(), - } -} - -func startControllers(ctx *controllerContext) error { - overrideDirectory := os.Getenv("PAYLOAD_OVERRIDE") - if len(overrideDirectory) > 0 { - glog.Warningf("Using an override payload directory for testing only: %s", overrideDirectory) - } - - go cvo.New( - startOpts.nodeName, - startOpts.namespace, startOpts.name, - rootOpts.releaseImage, - overrideDirectory, - ctx.ResyncPeriod(), - ctx.CVInformerFactory.Config().V1().ClusterVersions(), - ctx.InformerFactory.Config().V1().ClusterOperators(), - ctx.ClientBuilder.RestConfig(), - ctx.ClientBuilder.ClientOrDie(componentName), - ctx.ClientBuilder.KubeClientOrDie(componentName), - ctx.ClientBuilder.APIExtClientOrDie(componentName), - true, - ).Run(2, ctx.Stop) - - if startOpts.enableAutoUpdate { - go autoupdate.New( - componentNamespace, componentName, - ctx.CVInformerFactory.Config().V1().ClusterVersions(), - ctx.InformerFactory.Config().V1().ClusterOperators(), - ctx.ClientBuilder.ClientOrDie(componentName), - ctx.ClientBuilder.KubeClientOrDie(componentName), - ).Run(2, ctx.Stop) } - return nil + cmd.PersistentFlags().StringVar(&opts.ListenAddr, "listen", opts.ListenAddr, "Address to listen on for metrics") + cmd.PersistentFlags().StringVar(&opts.Kubeconfig, "kubeconfig", opts.Kubeconfig, "Kubeconfig file to access a remote cluster (testing only)") + cmd.PersistentFlags().StringVar(&opts.NodeName, "node-name", opts.NodeName, "kubernetes node name CVO is scheduled on.") + cmd.PersistentFlags().BoolVar(&opts.EnableAutoUpdate, "enable-auto-update", opts.EnableAutoUpdate, "Enables the autoupdate controller.") + cmd.PersistentFlags().StringVar(&opts.ReleaseImage, "release-image", opts.ReleaseImage, "The Openshift release image url.") + rootCmd.AddCommand(cmd) } diff --git a/pkg/autoupdate/autoupdate.go b/pkg/autoupdate/autoupdate.go index 7110c225d..374a23e48 100644 --- a/pkg/autoupdate/autoupdate.go +++ b/pkg/autoupdate/autoupdate.go @@ -8,7 +8,7 @@ import ( "github.com/blang/semver" "github.com/golang/glog" - "github.com/openshift/api/config/v1" + v1 "github.com/openshift/api/config/v1" clientset "github.com/openshift/client-go/config/clientset/versioned" "github.com/openshift/client-go/config/clientset/versioned/scheme" configinformersv1 "github.com/openshift/client-go/config/informers/externalversions/config/v1" @@ -65,7 +65,7 @@ func New( ) *Controller { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(namespace)}) ctrl := &Controller{ namespace: namespace, diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 72e66ac41..ff1510af3 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -137,7 +137,7 @@ func New( ) *Operator { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(namespace)}) optr := &Operator{ nodename: nodename, @@ -162,7 +162,7 @@ func New( optr.configSync = NewSyncWorker( optr.defaultPayloadRetriever(), - optr.defaultResourceBuilder(), + NewResourceBuilder(optr.restConfig), minimumInterval, wait.Backoff{ Duration: time.Second * 10, @@ -452,3 +452,8 @@ func (optr *Operator) currentVersion() configv1.Update { Payload: optr.releaseImage, } } + +// SetSyncWorkerForTesting updates the sync worker for whitebox testing. +func (optr *Operator) SetSyncWorkerForTesting(worker ConfigSyncWorker) { + optr.configSync = worker +} diff --git a/pkg/cvo/cvo_test.go b/pkg/cvo/cvo_test.go index 17430a6ae..499e5a407 100644 --- a/pkg/cvo/cvo_test.go +++ b/pkg/cvo/cvo_test.go @@ -1,15 +1,11 @@ package cvo import ( - "context" "fmt" - "io/ioutil" "net/http" "net/http/httptest" "os" - "path/filepath" "reflect" - "regexp" "strconv" "testing" "time" @@ -184,52 +180,9 @@ func (c *fakeApiExtClient) Patch(name string, pt types.PatchType, data []byte, s func TestOperator_sync(t *testing.T) { id := uuid.Must(uuid.NewRandom()).String() - content1 := map[string]interface{}{ - "manifests": map[string]interface{}{}, - "release-manifests": map[string]interface{}{ - "image-references": ` - { - "kind": "ImageStream", - "apiVersion": "image.openshift.io/v1", - "metadata": { - "name": "0.0.1-abc" - } - } - `, - }, - } - // contentWithoutManifests := map[string]interface{}{ - // "release-manifests": map[string]interface{}{ - // "image-references": ` - // { - // "kind": "ImageStream", - // "apiVersion": "image.openshift.io/v1", - // "metadata": { - // "name": "0.0.1-abc" - // } - // } - // `, - // }, - // } - // content_4_0_1 := map[string]interface{}{ - // "manifests": map[string]interface{}{}, - // "release-manifests": map[string]interface{}{ - // "image-references": ` - // { - // "kind": "ImageStream", - // "apiVersion": "image.openshift.io/v1", - // "metadata": { - // "name": "4.0.1" - // } - // } - // `, - // }, - // } - tests := []struct { name string key string - content map[string]interface{} syncStatus *SyncWorkerStatus optr Operator init func(optr *Operator) @@ -239,8 +192,7 @@ func TestOperator_sync(t *testing.T) { wantSync []configv1.Update }{ { - name: "create version and status", - content: content1, + name: "create version and status", optr: Operator{ releaseVersion: "4.0.1", releaseImage: "payload/image:v4.0.1", @@ -338,8 +290,7 @@ func TestOperator_sync(t *testing.T) { }, }, { - name: "progressing and previously failed, reconciling", - content: content1, + name: "progressing and previously failed, reconciling", optr: Operator{ releaseVersion: "4.0.1", releaseImage: "payload/image:v4.0.1", @@ -412,8 +363,7 @@ func TestOperator_sync(t *testing.T) { }, }, { - name: "progressing and previously failed, reconciling and multiple completions", - content: content1, + name: "progressing and previously failed, reconciling and multiple completions", optr: Operator{ releaseVersion: "4.0.1", releaseImage: "payload/image:v4.0.1", @@ -487,8 +437,7 @@ func TestOperator_sync(t *testing.T) { }, }, { - name: "progressing and encounters error during payload sync", - content: content1, + name: "progressing and encounters error during payload sync", optr: Operator{ releaseVersion: "4.0.1", releaseImage: "payload/image:v4.0.1", @@ -1903,14 +1852,6 @@ func TestOperator_sync(t *testing.T) { } optr.cvLister = &clientCVLister{client: optr.client} optr.clusterOperatorLister = &clientCOLister{client: optr.client} - dir, err := ioutil.TempDir("", "cvo-test") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(dir) - if err := createContent(dir, tt.content); err != nil { - t.Fatal(err) - } if optr.configSync == nil { expectStatus := tt.syncStatus if expectStatus == nil { @@ -1919,7 +1860,7 @@ func TestOperator_sync(t *testing.T) { optr.configSync = &fakeSyncRecorder{Returns: expectStatus} } - err = optr.sync(optr.queueKey()) + err := optr.sync(optr.queueKey()) if err != nil && tt.wantErr == nil { t.Fatalf("Operator.sync() unexpected error: %v", err) } @@ -2283,77 +2224,6 @@ func TestOperator_availableUpdatesSync(t *testing.T) { } } -var reVariable = regexp.MustCompile(`\$\([a-zA-Z0-9_\-]+\)`) - -func TestCreateContentReplacement(t *testing.T) { - replacements := []map[string]string{ - {"NS": "other"}, - } - in := `Some stuff $(NS) that should be $(NS)` - out := reVariable.ReplaceAllStringFunc(in, func(key string) string { - key = key[2 : len(key)-1] - for _, r := range replacements { - v, ok := r[key] - if !ok { - continue - } - return v - } - return key - }) - if out != `Some stuff other that should be other` { - t.Fatal(out) - } -} - -func createContent(baseDir string, content map[string]interface{}, replacements ...map[string]string) error { - if err := os.MkdirAll(baseDir, 0750); err != nil { - return err - } - for k, v := range content { - switch t := v.(type) { - case string: - if len(replacements) > 0 { - t = reVariable.ReplaceAllStringFunc(t, func(key string) string { - key = key[2 : len(key)-1] - for _, r := range replacements { - v, ok := r[key] - if !ok { - continue - } - return v - } - return key - }) - } - if err := ioutil.WriteFile(filepath.Join(baseDir, k), []byte(t), 0640); err != nil { - return err - } - case map[string]interface{}: - dir := filepath.Join(baseDir, k) - if err := os.Mkdir(dir, 0750); err != nil { - return err - } - if err := createContent(dir, t, replacements...); err != nil { - return err - } - } - } - return nil -} - -type mapPayloadRetriever struct { - Paths map[string]string -} - -func (r *mapPayloadRetriever) RetrievePayload(ctx context.Context, update configv1.Update) (string, error) { - path, ok := r.Paths[update.Payload] - if !ok { - return "", fmt.Errorf("no payload found for %q", update.Payload) - } - return path, nil -} - func expectGet(t *testing.T, a ktesting.Action, resource, namespace, name string) { t.Helper() if "get" != a.GetVerb() { diff --git a/pkg/cvo/sync.go b/pkg/cvo/sync.go index 26e816b68..35422d5c1 100644 --- a/pkg/cvo/sync.go +++ b/pkg/cvo/sync.go @@ -36,12 +36,6 @@ var requeueOnErrorCauseToCheck = map[string]func(error) bool{ RequeueOnErrorCauseNoMatch: meta.IsNoMatchError, } -func (optr *Operator) defaultResourceBuilder() ResourceBuilder { - return &resourceBuilder{ - config: optr.restConfig, - } -} - // resourceBuilder provides the default builder implementation for the operator. // It is abstracted for testing. type resourceBuilder struct { @@ -49,6 +43,11 @@ type resourceBuilder struct { modifier resourcebuilder.MetaV1ObjectModifierFunc } +// NewResourceBuilder creates the default resource builder implementation. +func NewResourceBuilder(config *rest.Config) ResourceBuilder { + return &resourceBuilder{config: config} +} + func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface, error) { if resourcebuilder.Mapper.Exists(m.GVK) { return resourcebuilder.New(resourcebuilder.Mapper, b.config, *m) diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index 58af557ce..74d403d1a 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -381,7 +381,7 @@ func Test_SyncWorker_apply(t *testing.T) { worker := &SyncWorker{} worker.backoff.Steps = 3 - worker.builder = (&Operator{}).defaultResourceBuilder() + worker.builder = NewResourceBuilder(nil) ctx := context.Background() worker.apply(ctx, up, &SyncWork{}, &statusWrapper{w: worker, previousStatus: worker.Status()}) test.check(t, r.actions) diff --git a/pkg/start/start.go b/pkg/start/start.go new file mode 100644 index 000000000..c5761d1b6 --- /dev/null +++ b/pkg/start/start.go @@ -0,0 +1,320 @@ +package start + +import ( + "context" + "fmt" + "math/rand" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/golang/glog" + "github.com/google/uuid" + + clientset "github.com/openshift/client-go/config/clientset/versioned" + informers "github.com/openshift/client-go/config/informers/externalversions" + "github.com/openshift/cluster-version-operator/pkg/autoupdate" + "github.com/openshift/cluster-version-operator/pkg/cvo" + "github.com/prometheus/client_golang/prometheus/promhttp" + v1 "k8s.io/api/core/v1" + apiext "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + coreclientsetv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" +) + +const ( + defaultComponentName = "version" + defaultComponentNamespace = "openshift-cluster-version" + + minResyncPeriod = 2 * time.Minute + + leaseDuration = 90 * time.Second + renewDeadline = 45 * time.Second + retryPeriod = 30 * time.Second +) + +type Options struct { + ReleaseImage string + + Kubeconfig string + NodeName string + ListenAddr string + + EnableAutoUpdate bool + + // for testing only + Name string + Namespace string + PayloadOverride string + EnableMetrics bool + ResyncInterval time.Duration +} + +func defaultEnv(name, defaultValue string) string { + env, ok := os.LookupEnv(name) + if !ok { + return defaultValue + } + return env +} + +func NewOptions() *Options { + return &Options{ + ListenAddr: "0.0.0.0:9099", + NodeName: os.Getenv("NODE_NAME"), + + // exposed only for testing + Namespace: defaultEnv("CVO_NAMESPACE", defaultComponentNamespace), + Name: defaultEnv("CVO_NAME", defaultComponentName), + PayloadOverride: os.Getenv("PAYLOAD_OVERRIDE"), + ResyncInterval: minResyncPeriod, + EnableMetrics: true, + } +} + +func (o *Options) Run() error { + if o.NodeName == "" { + return fmt.Errorf("node-name is required") + } + if o.ReleaseImage == "" { + return fmt.Errorf("missing --release-image flag, it is required") + } + if len(o.PayloadOverride) > 0 { + glog.Warningf("Using an override payload directory for testing only: %s", o.PayloadOverride) + } + + // initialize the core objects + cb, err := newClientBuilder(o.Kubeconfig) + if err != nil { + return fmt.Errorf("error creating clients: %v", err) + } + lock, err := createResourceLock(cb, o.Namespace, o.Name) + if err != nil { + return err + } + controllerCtx := o.NewControllerContext(cb) + + // TODO: Kube 1.14 will contain a ReleaseOnCancel boolean on + // LeaderElectionConfig that allows us to have the lock code + // release the lease when this context is cancelled. At that + // time we can remove our changes to OnStartedLeading. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ch := make(chan os.Signal, 1) + defer func() { signal.Stop(ch) }() + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + go func() { + sig := <-ch + glog.Infof("Shutting down due to %s", sig) + cancel() + + // exit after 2s no matter what + select { + case <-time.After(2 * time.Second): + glog.Fatalf("Exiting") + case <-ch: + glog.Fatalf("Received shutdown signal twice, exiting") + } + }() + + o.run(ctx, controllerCtx, lock) + return nil +} + +func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourcelock.ConfigMapLock) { + // listen on metrics + if len(o.ListenAddr) > 0 { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + go func() { + if err := http.ListenAndServe(o.ListenAddr, mux); err != nil { + glog.Fatalf("Unable to start metrics server: %v", err) + } + }() + } + + exit := make(chan struct{}) + + // TODO: when we switch to graceful lock shutdown, this can be + // moved back inside RunOrDie + go leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ + Lock: lock, + LeaseDuration: leaseDuration, + RenewDeadline: renewDeadline, + RetryPeriod: retryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(stop <-chan struct{}) { + controllerCtx.Start(ctx.Done()) + select { + case <-ctx.Done(): + // WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel + // and client-go ContextCancelable, which allows us to block new API requests before + // we step down. However, the CVO isn't that sensitive to races and can tolerate + // brief overlap. + glog.Infof("Stepping down as leader") + // give the controllers some time to shut down + time.Sleep(100 * time.Millisecond) + // if we still hold the leader lease, clear the owner identity (other lease watchers + // still have to wait for expiration) like the new ReleaseOnCancel code will do. + if err := lock.Update(resourcelock.LeaderElectionRecord{}); err == nil { + // if we successfully clear the owner identity, we can safely delete the record + if err := lock.Client.ConfigMaps(lock.ConfigMapMeta.Namespace).Delete(lock.ConfigMapMeta.Name, nil); err != nil { + glog.Warningf("Unable to step down cleanly: %v", err) + } + } + glog.Infof("Finished shutdown") + close(exit) + case <-stop: + // we will exit in OnStoppedLeading + } + }, + OnStoppedLeading: func() { + glog.Warning("leaderelection lost") + close(exit) + }, + }, + }) + + <-exit +} + +// createResourceLock initializes the lock. +func createResourceLock(cb *ClientBuilder, namespace, name string) (*resourcelock.ConfigMapLock, error) { + client := cb.KubeClientOrDie("leader-election") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: client.CoreV1().Events(namespace)}) + + id, err := os.Hostname() + if err != nil { + return nil, fmt.Errorf("error creating lock: %v", err) + } + + uuid, err := uuid.NewRandom() + if err != nil { + return nil, fmt.Errorf("Failed to generate UUID: %v", err) + } + + // add a uniquifier so that two processes on the same host don't accidentally both become active + id = id + "_" + uuid.String() + + return &resourcelock.ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Client: client.CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: namespace}), + }, + }, nil +} + +func resyncPeriod(minResyncPeriod time.Duration) func() time.Duration { + return func() time.Duration { + factor := rand.Float64() + 1 + return time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor) + } +} + +type ClientBuilder struct { + config *rest.Config +} + +func (cb *ClientBuilder) RestConfig() *rest.Config { + c := rest.CopyConfig(cb.config) + return c +} + +func (cb *ClientBuilder) ClientOrDie(name string) clientset.Interface { + return clientset.NewForConfigOrDie(rest.AddUserAgent(cb.config, name)) +} + +func (cb *ClientBuilder) KubeClientOrDie(name string) kubernetes.Interface { + return kubernetes.NewForConfigOrDie(rest.AddUserAgent(cb.config, name)) +} + +func (cb *ClientBuilder) APIExtClientOrDie(name string) apiext.Interface { + return apiext.NewForConfigOrDie(rest.AddUserAgent(cb.config, name)) +} + +func newClientBuilder(kubeconfig string) (*ClientBuilder, error) { + clientCfg := clientcmd.NewDefaultClientConfigLoadingRules() + clientCfg.ExplicitPath = kubeconfig + + kcfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(clientCfg, &clientcmd.ConfigOverrides{}) + config, err := kcfg.ClientConfig() + if err != nil { + return nil, err + } + + return &ClientBuilder{ + config: config, + }, nil +} + +type Context struct { + CVO *cvo.Operator + AutoUpdate *autoupdate.Controller + + CVInformerFactory informers.SharedInformerFactory + InformerFactory informers.SharedInformerFactory +} + +func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { + client := cb.ClientOrDie("shared-informer") + + cvInformer := informers.NewFilteredSharedInformerFactory(client, resyncPeriod(o.ResyncInterval)(), "", func(opts *metav1.ListOptions) { + opts.FieldSelector = fmt.Sprintf("metadata.name=%s", o.Name) + }) + sharedInformers := informers.NewSharedInformerFactory(client, resyncPeriod(o.ResyncInterval)()) + + ctx := &Context{ + CVInformerFactory: cvInformer, + InformerFactory: sharedInformers, + + CVO: cvo.New( + o.NodeName, + o.Namespace, o.Name, + o.ReleaseImage, + o.PayloadOverride, + resyncPeriod(o.ResyncInterval)(), + cvInformer.Config().V1().ClusterVersions(), + sharedInformers.Config().V1().ClusterOperators(), + cb.RestConfig(), + cb.ClientOrDie(o.Namespace), + cb.KubeClientOrDie(o.Namespace), + cb.APIExtClientOrDie(o.Namespace), + o.EnableMetrics, + ), + } + if o.EnableAutoUpdate { + ctx.AutoUpdate = autoupdate.New( + o.Namespace, o.Name, + cvInformer.Config().V1().ClusterVersions(), + sharedInformers.Config().V1().ClusterOperators(), + cb.ClientOrDie(o.Namespace), + cb.KubeClientOrDie(o.Namespace), + ) + } + return ctx +} + +func (ctx *Context) Start(ch <-chan struct{}) { + go ctx.CVO.Run(2, ch) + if ctx.AutoUpdate != nil { + go ctx.AutoUpdate.Run(2, ch) + } + ctx.CVInformerFactory.Start(ch) + ctx.InformerFactory.Start(ch) +} diff --git a/pkg/cvo/cvo_integration_test.go b/pkg/start/start_integration_test.go similarity index 78% rename from pkg/cvo/cvo_integration_test.go rename to pkg/start/start_integration_test.go index 033ab5dbc..4a401a652 100644 --- a/pkg/cvo/cvo_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -1,32 +1,31 @@ -package cvo +package start import ( + "context" "encoding/json" "fmt" "io/ioutil" "os" "path/filepath" "reflect" + "regexp" "strings" "testing" "time" - "k8s.io/apimachinery/pkg/util/diff" - v1 "k8s.io/api/core/v1" - apiext "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/diff" randutil "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" configv1 "github.com/openshift/api/config/v1" clientset "github.com/openshift/client-go/config/clientset/versioned" - informers "github.com/openshift/client-go/config/informers/externalversions" "github.com/openshift/cluster-version-operator/lib/resourcemerge" + "github.com/openshift/cluster-version-operator/pkg/cvo" ) var ( @@ -171,15 +170,14 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { } t.Parallel() - kcfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(clientcmd.NewDefaultClientConfigLoadingRules(), &clientcmd.ConfigOverrides{}) - cfg, err := kcfg.ClientConfig() + // use the same client setup as the start command + cb, err := newClientBuilder("") if err != nil { - t.Fatalf("cannot load config: %v", err) + t.Fatal(err) } - - kc := kubernetes.NewForConfigOrDie(cfg) - client := clientset.NewForConfigOrDie(cfg) - apiExtClient := apiext.NewForConfigOrDie(cfg) + cfg := cb.RestConfig() + kc := cb.KubeClientOrDie("integration-test") + client := cb.ClientOrDie("integration-test") ns := fmt.Sprintf("e2e-cvo-%s", randutil.String(4)) @@ -199,11 +197,6 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { } }() - cvInformer := informers.NewFilteredSharedInformerFactory(client, 1*time.Minute, "", func(opts *metav1.ListOptions) { - opts.FieldSelector = fmt.Sprintf("metadata.name=%s", ns) - }) - sharedInformers := informers.NewSharedInformerFactory(client, 1*time.Minute) - dir, err := ioutil.TempDir("", "cvo-test") if err != nil { t.Fatal(err) @@ -222,25 +215,22 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { payloadImage2: filepath.Join(dir, "0.0.2"), }} - optr := New( - "", ns, ns, payloadImage1, - filepath.Join(dir, "ignored"), - 5*time.Second, - cvInformer.Config().V1().ClusterVersions(), - sharedInformers.Config().V1().ClusterOperators(), - cfg, - client, kc, apiExtClient, - false, - ) + options := NewOptions() + options.Namespace = ns + options.Name = ns + options.ListenAddr = "" + options.NodeName = "test-node" + options.ReleaseImage = payloadImage1 + options.PayloadOverride = filepath.Join(dir, "ignored") + options.EnableMetrics = false + controllers := options.NewControllerContext(cb) - worker := optr.configSync.(*SyncWorker) - worker.retriever = retriever + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + controllers.CVO.SetSyncWorkerForTesting(worker) stopCh := make(chan struct{}) defer close(stopCh) - go cvInformer.Start(stopCh) - go sharedInformers.Start(stopCh) - go optr.Run(1, stopCh) + controllers.Start(stopCh) t.Logf("wait until we observe the cluster version become available") lastCV, err := waitForAvailableUpdate(t, client, ns, false, "0.0.1") @@ -249,7 +239,7 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { t.Fatalf("cluster version never became available: %v", err) } - status := optr.configSync.(*SyncWorker).Status() + status := worker.Status() t.Logf("verify the available cluster version's status matches our expectations") t.Logf("Cluster version:\n%s", printCV(lastCV)) @@ -258,7 +248,7 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { t.Logf("wait for the next resync and verify that status didn't change") if err := wait.Poll(time.Second, 30*time.Second, func() (bool, error) { - updated := optr.configSync.(*SyncWorker).Status() + updated := worker.Status() if updated.Completed > status.Completed { return true, nil } @@ -300,11 +290,11 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { t.Fatalf("couldn't delete CVO managed object: %v", err) } - status = optr.configSync.(*SyncWorker).Status() + status = worker.Status() t.Logf("wait for the next resync and verify that status didn't change") if err := wait.Poll(time.Second, 30*time.Second, func() (bool, error) { - updated := optr.configSync.(*SyncWorker).Status() + updated := worker.Status() if updated.Completed > status.Completed { return true, nil } @@ -331,15 +321,14 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { } t.Parallel() - kcfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(clientcmd.NewDefaultClientConfigLoadingRules(), &clientcmd.ConfigOverrides{}) - cfg, err := kcfg.ClientConfig() + // use the same client setup as the start command + cb, err := newClientBuilder("") if err != nil { - t.Fatalf("cannot load config: %v", err) + t.Fatal(err) } - - kc := kubernetes.NewForConfigOrDie(cfg) - client := clientset.NewForConfigOrDie(cfg) - apiExtClient := apiext.NewForConfigOrDie(cfg) + cfg := cb.RestConfig() + kc := cb.KubeClientOrDie("integration-test") + client := cb.ClientOrDie("integration-test") ns := fmt.Sprintf("e2e-cvo-%s", randutil.String(4)) @@ -359,11 +348,6 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { } }() - cvInformer := informers.NewFilteredSharedInformerFactory(client, 1*time.Minute, "", func(opts *metav1.ListOptions) { - opts.FieldSelector = fmt.Sprintf("metadata.name=%s", ns) - }) - sharedInformers := informers.NewSharedInformerFactory(client, 1*time.Minute) - dir, err := ioutil.TempDir("", "cvo-test") if err != nil { t.Fatal(err) @@ -382,25 +366,22 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { payloadImage2: filepath.Join(dir, "0.0.2"), }} - optr := New( - "", ns, ns, payloadImage1, - filepath.Join(dir, "ignored"), - 10*time.Second, - cvInformer.Config().V1().ClusterVersions(), - sharedInformers.Config().V1().ClusterOperators(), - cfg, - client, kc, apiExtClient, - false, - ) + options := NewOptions() + options.Namespace = ns + options.Name = ns + options.ListenAddr = "" + options.NodeName = "test-node" + options.ReleaseImage = payloadImage1 + options.PayloadOverride = filepath.Join(dir, "ignored") + options.EnableMetrics = false + controllers := options.NewControllerContext(cb) - worker := optr.configSync.(*SyncWorker) - worker.retriever = retriever + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + controllers.CVO.SetSyncWorkerForTesting(worker) stopCh := make(chan struct{}) defer close(stopCh) - go cvInformer.Start(stopCh) - go sharedInformers.Start(stopCh) - go optr.Run(1, stopCh) + controllers.Start(stopCh) t.Logf("wait until we observe the cluster version become available") lastCV, err := waitForAvailableUpdate(t, client, ns, false, "0.0.1") @@ -460,6 +441,116 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { verifyReleasePayload(t, kc, ns, "0.0.1", payloadImage1) } +func TestIntegrationCVO_gracefulStepDown(t *testing.T) { + if os.Getenv("TEST_INTEGRATION") != "1" { + t.Skipf("Integration tests are disabled unless TEST_INTEGRATION=1") + } + t.Parallel() + + // use the same client setup as the start command + cb, err := newClientBuilder("") + if err != nil { + t.Fatal(err) + } + cfg := cb.RestConfig() + kc := cb.KubeClientOrDie("integration-test") + client := cb.ClientOrDie("integration-test") + + ns := fmt.Sprintf("e2e-cvo-%s", randutil.String(4)) + + if _, err := kc.Core().Namespaces().Create(&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: ns, + }, + }); err != nil { + t.Fatal(err) + } + defer func() { + if err := client.Config().ClusterVersions().Delete(ns, nil); err != nil { + t.Logf("failed to delete cluster version %s: %v", ns, err) + } + if err := kc.Core().Namespaces().Delete(ns, nil); err != nil { + t.Logf("failed to delete namespace %s: %v", ns, err) + } + }() + + options := NewOptions() + options.Namespace = ns + options.Name = ns + options.ListenAddr = "" + options.NodeName = "test-node" + options.EnableMetrics = false + controllers := options.NewControllerContext(cb) + + worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + controllers.CVO.SetSyncWorkerForTesting(worker) + + lock, err := createResourceLock(cb, ns, ns) + if err != nil { + t.Fatal(err) + } + + t.Logf("the controller should create a lock record on a config map") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan struct{}) + go func() { + options.run(ctx, controllers, lock) + close(done) + }() + + // wait until the lock record exists + err = wait.PollImmediate(200*time.Millisecond, 60*time.Second, func() (bool, error) { + _, err := kc.Core().ConfigMaps(ns).Get(ns, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, err + } + return true, nil + }) + if err != nil { + t.Fatal(err) + } + + t.Logf("verify the controller writes a leadership change event") + events, err := kc.Core().Events(ns).List(metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + if !hasLeaderEvent(events.Items, ns) { + t.Fatalf("no leader election events found in\n%#v", events.Items) + } + + t.Logf("after the context is closed, the lock record should be deleted quickly") + cancel() + startTime := time.Now() + var endTime time.Time + // the lock should be deleted immediately + err = wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) { + _, err := kc.Core().ConfigMaps(ns).Get(ns, metav1.GetOptions{}) + if errors.IsNotFound(err) { + endTime = time.Now() + return true, nil + } + if err != nil { + return false, err + } + return false, nil + }) + if err != nil { + t.Fatal(err) + } + t.Logf("lock deleted in %s", endTime.Sub(startTime)) + + select { + case <-time.After(time.Second): + t.Fatalf("controller should exit more quickly") + case <-done: + } +} + // waitForAvailableUpdates checks invariants during an upgrade process. versions is a list of the expected versions that // should be seen during update, with the last version being the one we wait to see. func waitForAvailableUpdate(t *testing.T, client clientset.Interface, ns string, allowIncrementalFailure bool, versions ...string) (*configv1.ClusterVersion, error) { @@ -703,6 +794,15 @@ func verifyReleasePayloadConfigMap2(t *testing.T, kc kubernetes.Interface, ns, v } } +func hasLeaderEvent(events []v1.Event, name string) bool { + for _, event := range events { + if event.Reason == "LeaderElection" && event.InvolvedObject.Name == name { + return true + } + } + return false +} + func printCV(cv *configv1.ClusterVersion) string { data, err := json.MarshalIndent(cv, "", " ") if err != nil { @@ -710,3 +810,74 @@ func printCV(cv *configv1.ClusterVersion) string { } return string(data) } + +var reVariable = regexp.MustCompile(`\$\([a-zA-Z0-9_\-]+\)`) + +func TestCreateContentReplacement(t *testing.T) { + replacements := []map[string]string{ + {"NS": "other"}, + } + in := `Some stuff $(NS) that should be $(NS)` + out := reVariable.ReplaceAllStringFunc(in, func(key string) string { + key = key[2 : len(key)-1] + for _, r := range replacements { + v, ok := r[key] + if !ok { + continue + } + return v + } + return key + }) + if out != `Some stuff other that should be other` { + t.Fatal(out) + } +} + +func createContent(baseDir string, content map[string]interface{}, replacements ...map[string]string) error { + if err := os.MkdirAll(baseDir, 0750); err != nil { + return err + } + for k, v := range content { + switch t := v.(type) { + case string: + if len(replacements) > 0 { + t = reVariable.ReplaceAllStringFunc(t, func(key string) string { + key = key[2 : len(key)-1] + for _, r := range replacements { + v, ok := r[key] + if !ok { + continue + } + return v + } + return key + }) + } + if err := ioutil.WriteFile(filepath.Join(baseDir, k), []byte(t), 0640); err != nil { + return err + } + case map[string]interface{}: + dir := filepath.Join(baseDir, k) + if err := os.Mkdir(dir, 0750); err != nil { + return err + } + if err := createContent(dir, t, replacements...); err != nil { + return err + } + } + } + return nil +} + +type mapPayloadRetriever struct { + Paths map[string]string +} + +func (r *mapPayloadRetriever) RetrievePayload(ctx context.Context, update configv1.Update) (string, error) { + path, ok := r.Paths[update.Payload] + if !ok { + return "", fmt.Errorf("no payload found for %q", update.Payload) + } + return path, nil +}