From b1a0f9292de14b745bb48cfa7e0b9fab8c9fcbb3 Mon Sep 17 00:00:00 2001 From: Jack Ottofaro Date: Thu, 14 May 2020 21:36:01 -0400 Subject: [PATCH] new changes --- pkg/cvo/cvo.go | 2 +- pkg/cvo/cvo_scenarios_test.go | 74 ++++++++++++++++--- pkg/cvo/sync_test.go | 5 +- pkg/cvo/sync_worker.go | 60 ++++++++------- pkg/cvo/upgradeable.go | 12 +++ .../clusterversion/upgradable_test.go | 16 +--- .../clusterversion/upgradeable.go | 20 +++-- pkg/payload/precondition/precondition.go | 7 +- 8 files changed, 127 insertions(+), 69 deletions(-) diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index a4c561488..25608ecd2 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -340,7 +340,7 @@ func (optr *Operator) Run(ctx context.Context, workers int) { // start the config sync loop, and have it notify the queue when new status is detected go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) - go optr.configSync.Start(ctx, 16) + go optr.configSync.Start(ctx, 16, optr.checkForClusterVersionOverridesSet()) go wait.Until(func() { optr.worker(optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh) go wait.Until(func() { optr.worker(optr.upgradeableQueue, optr.upgradeableSync) }, time.Second, stopCh) go wait.Until(func() { diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 053c23666..e64338322 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -98,6 +98,12 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak } func TestCVO_StartupAndSync(t *testing.T) { + + cond := &configv1.ClusterOperatorStatusCondition{ + Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"), + Status: configv1.ConditionFalse, + } + o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest") ctx, cancel := context.WithCancel(context.Background()) @@ -105,7 +111,7 @@ func TestCVO_StartupAndSync(t *testing.T) { defer shutdownFn() worker := o.configSync.(*SyncWorker) - go worker.Start(ctx, 1) + go worker.Start(ctx, 1, cond) // Step 1: Verify the CVO creates the initial Cluster Version object // @@ -352,6 +358,11 @@ func TestCVO_StartupAndSync(t *testing.T) { } func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { + + cond := &configv1.ClusterOperatorStatusCondition{ + Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"), + Status: configv1.ConditionFalse, + } o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest") ctx, cancel := context.WithCancel(context.Background()) @@ -376,7 +387,7 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { VerificationError: payloadErr, } - go worker.Start(ctx, 1) + go worker.Start(ctx, 1, cond) // Step 1: Verify the CVO creates the initial Cluster Version object // @@ -623,6 +634,11 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { } func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { + + cond := &configv1.ClusterOperatorStatusCondition{ + Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"), + Status: configv1.ConditionFalse, + } o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest") ctx, cancel := context.WithCancel(context.Background()) @@ -637,7 +653,7 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { Directory: "testdata/payloadtest", Local: true, } - go worker.Start(ctx, 1) + go worker.Start(ctx, 1, cond) // Step 1: Verify the CVO creates the initial Cluster Version object // @@ -884,6 +900,11 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { } func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { + + cond := &configv1.ClusterOperatorStatusCondition{ + Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"), + Status: configv1.ConditionFalse, + } o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest-2") // Setup: a successful sync from a previous run, and the operator at the same image as before @@ -937,7 +958,7 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { retriever := worker.retriever.(*fakeDirectoryRetriever) retriever.Set(PayloadInfo{}, payloadErr) - go worker.Start(ctx, 1) + go worker.Start(ctx, 1, cond) // Step 1: The operator should report that it is blocked on unverified content // @@ -1109,6 +1130,11 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { } func TestCVO_UpgradeUnverifiedPayloadRetriveOnce(t *testing.T) { + + cond := &configv1.ClusterOperatorStatusCondition{ + Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"), + Status: configv1.ConditionFalse, + } o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest-2") // Setup: a successful sync from a previous run, and the operator at the same image as before @@ -1162,7 +1188,7 @@ func TestCVO_UpgradeUnverifiedPayloadRetriveOnce(t *testing.T) { retriever := worker.retriever.(*fakeDirectoryRetriever) retriever.Set(PayloadInfo{}, payloadErr) - go worker.Start(ctx, 1) + go worker.Start(ctx, 1, cond) // Step 1: The operator should report that it is blocked on unverified content // @@ -1371,6 +1397,11 @@ func TestCVO_UpgradeUnverifiedPayloadRetriveOnce(t *testing.T) { } func TestCVO_UpgradePreconditionFailing(t *testing.T) { + + cond := &configv1.ClusterOperatorStatusCondition{ + Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"), + Status: configv1.ConditionFalse, + } o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest-2") // Setup: a successful sync from a previous run, and the operator at the same image as before @@ -1414,7 +1445,7 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { worker := o.configSync.(*SyncWorker) worker.preconditions = []precondition.Precondition{&testPrecondition{SuccessAfter: 3}} - go worker.Start(ctx, 1) + go worker.Start(ctx, 1, cond) // Step 1: The operator should report that it is blocked on precondition checks failing // @@ -1595,6 +1626,11 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { } func TestCVO_UpgradeVerifiedPayload(t *testing.T) { + + cond := &configv1.ClusterOperatorStatusCondition{ + Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"), + Status: configv1.ConditionFalse, + } o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest-2") // Setup: a successful sync from a previous run, and the operator at the same image as before @@ -1649,7 +1685,7 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) { retriever := worker.retriever.(*fakeDirectoryRetriever) retriever.Set(PayloadInfo{}, payloadErr) - go worker.Start(ctx, 1) + go worker.Start(ctx, 1, cond) // Step 1: The operator should report that it is blocked on unverified content // @@ -1817,6 +1853,11 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) { }) } func TestCVO_RestartAndReconcile(t *testing.T) { + + cond := &configv1.ClusterOperatorStatusCondition{ + Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"), + Status: configv1.ConditionFalse, + } o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest") ctx, cancel := context.WithCancel(context.Background()) @@ -1885,7 +1926,7 @@ func TestCVO_RestartAndReconcile(t *testing.T) { // Step 2: Start the sync worker and verify the sequence of events, and then verify // the status does not change // - go worker.Start(ctx, 1) + go worker.Start(ctx, 1, cond) // verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ @@ -2045,9 +2086,14 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { t.Fatalf("The worker should be reconciling: %v", worker.work) } + cond := &configv1.ClusterOperatorStatusCondition{ + Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"), + Status: configv1.ConditionFalse, + } + // Step 2: Start the sync worker and verify the sequence of events // - go worker.Start(ctx, 1) + go worker.Start(ctx, 1, cond) // verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ @@ -2182,6 +2228,11 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { } func TestCVO_ParallelError(t *testing.T) { + + cond := &configv1.ClusterOperatorStatusCondition{ + Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"), + Status: configv1.ConditionFalse, + } o, cvs, client, _, shutdownFn := setupCVOTest("testdata/paralleltest") ctx, cancel := context.WithCancel(context.Background()) @@ -2251,7 +2302,7 @@ func TestCVO_ParallelError(t *testing.T) { // cancellable, cancel := context.WithCancel(ctx) defer cancel() - go worker.Start(cancellable, 1) + go worker.Start(cancellable, 1, cond) // verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ @@ -2469,7 +2520,7 @@ func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWork t.Helper() if len(items) == 0 { if len(ch) > 0 { - t.Fatalf("expected status to empty, got %#v", <-ch) + t.Fatalf("expected status to be empty, got %#v", <-ch) } return } @@ -2487,7 +2538,6 @@ func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWork } else if !lastTime.IsZero() { actual.LastProgress = time.Unix(count, 0) } - if !reflect.DeepEqual(expect, actual) { t.Fatalf("unexpected status item %d: %s", i, diff.ObjectReflectDiff(expect, actual)) } diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index d8a7c35ea..9f5c96b49 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -387,7 +387,8 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus { return ch } -func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int) {} +func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int, statusCond *configv1.ClusterOperatorStatusCondition) { +} func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { r.Updates = append(r.Updates, desired) @@ -453,7 +454,7 @@ func (pf *testPrecondition) Name() string { return fmt.Sprintf("TestPrecondition SuccessAfter: %d", pf.SuccessAfter) } -func (pf *testPrecondition) Run(_ context.Context, _ precondition.ReleaseContext) error { +func (pf *testPrecondition) Run(_ context.Context, _ precondition.ReleaseContext, statusCond *configv1.ClusterOperatorStatusCondition) error { if pf.SuccessAfter == 0 { return nil } diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index b06474eab..2be0b8fa8 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -29,7 +29,7 @@ import ( // ConfigSyncWorker abstracts how the image is synchronized to the server. Introduced for testing. type ConfigSyncWorker interface { - Start(ctx context.Context, maxWorkers int) + Start(ctx context.Context, maxWorkers int, statusCond *configv1.ClusterOperatorStatusCondition) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus StatusCh() <-chan SyncWorkerStatus } @@ -242,7 +242,7 @@ func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides // Start periodically invokes run, detecting whether content has changed. // It is edge-triggered when Update() is invoked and level-driven after the // syncOnce() has succeeded for a given input (we are said to be "reconciling"). -func (w *SyncWorker) Start(ctx context.Context, maxWorkers int) { +func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, statusCond *configv1.ClusterOperatorStatusCondition) { klog.V(5).Infof("Starting sync worker") work := &SyncWork{} @@ -311,7 +311,7 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int) { // so that we don't fail, then immediately start reporting an earlier status reporter := &statusWrapper{w: w, previousStatus: w.Status()} klog.V(5).Infof("Previous sync status: %#v", reporter.previousStatus) - return w.syncOnce(ctx, work, maxWorkers, reporter) + return w.syncOnce(ctx, work, maxWorkers, reporter, statusCond) }() if err != nil { // backoff wait @@ -467,7 +467,7 @@ func (w *SyncWorker) Status() *SyncWorkerStatus { // sync retrieves the image and applies it to the server, returning an error if // the update could not be completely applied. The status is updated as we progress. // Cancelling the context will abort the execution of the sync. -func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter) error { +func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter, statusCond *configv1.ClusterOperatorStatusCondition) error { klog.V(4).Infof("Running sync %s (force=%t) on generation %d in state %s at attempt %d", versionString(work.Desired), work.Desired.Force, work.Generation, work.State, work.Attempt) update := work.Desired @@ -527,7 +527,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in Actual: update, Verified: info.Verified, }) - if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.ReleaseVersion})); err != nil { + if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.ReleaseVersion}, statusCond)); err != nil { klog.Infof("precondition error %v", err) if update.Force { klog.V(4).Infof("Forcing past precondition failures: %s", err) @@ -646,37 +646,35 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w }) } - /* - // update each object - errs := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error { - for _, task := range tasks { - if contextIsCancelled(ctx) { - return cr.CancelError() - } - cr.Update() + // update each object + errs := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error { + for _, task := range tasks { + if contextIsCancelled(ctx) { + return cr.CancelError() + } + cr.Update() - klog.V(4).Infof("Running sync for %s", task) - klog.V(5).Infof("Manifest: %s", string(task.Manifest.Raw)) + klog.V(4).Infof("Running sync for %s", task) + klog.V(5).Infof("Manifest: %s", string(task.Manifest.Raw)) - ov, ok := getOverrideForManifest(work.Overrides, w.exclude, task.Manifest) - if ok && ov.Unmanaged { - klog.V(4).Infof("Skipping %s as unmanaged", task) - continue - } + ov, ok := getOverrideForManifest(work.Overrides, w.exclude, task.Manifest) + if ok && ov.Unmanaged { + klog.V(4).Infof("Skipping %s as unmanaged", task) + continue + } - if err := task.Run(ctx, version, w.builder, work.State); err != nil { - return err - } - cr.Inc() - klog.V(4).Infof("Done syncing for %s", task) + if err := task.Run(ctx, version, w.builder, work.State); err != nil { + return err } - return nil - }) - if len(errs) > 0 { - err := cr.Errors(errs) - return err + cr.Inc() + klog.V(4).Infof("Done syncing for %s", task) } - */ + return nil + }) + if len(errs) > 0 { + err := cr.Errors(errs) + return err + } // update the status cr.Complete() diff --git a/pkg/cvo/upgradeable.go b/pkg/cvo/upgradeable.go index da29b86a5..c7af08e3f 100644 --- a/pkg/cvo/upgradeable.go +++ b/pkg/cvo/upgradeable.go @@ -19,6 +19,18 @@ import ( "github.com/openshift/cluster-version-operator/lib/resourcemerge" ) +func (optr *Operator) checkForClusterVersionOverridesSet() *configv1.ClusterOperatorStatusCondition { + + for _, check := range optr.upgradeableChecks { + if cond := check.Check(); cond != nil { + if cond.Reason == "ClusterVersionOverridesSet" { + return cond + } + } + } + return nil +} + // syncUpgradeable. The status is only checked if it has been more than // the minimumUpdateCheckInterval since the last check. func (optr *Operator) syncUpgradeable(config *configv1.ClusterVersion) error { diff --git a/pkg/payload/precondition/clusterversion/upgradable_test.go b/pkg/payload/precondition/clusterversion/upgradable_test.go index 0e720d27b..7bd48bce8 100644 --- a/pkg/payload/precondition/clusterversion/upgradable_test.go +++ b/pkg/payload/precondition/clusterversion/upgradable_test.go @@ -102,20 +102,6 @@ func TestUpgradeableRun(t *testing.T) { upgradeable: ptr(configv1.ConditionFalse), currVersion: "4.1.3", desiredVersion: "4.1.4", - expected: "set to False", - }, - { - name: "move-z, with true condition", - upgradeable: ptr(configv1.ConditionTrue), - currVersion: "4.1.3", - desiredVersion: "4.1.4", - expected: "", - }, - { - name: "move-z, with unknown condition", - upgradeable: ptr(configv1.ConditionUnknown), - currVersion: "4.1.3", - desiredVersion: "4.1.4", expected: "", }, } @@ -142,7 +128,7 @@ func TestUpgradeableRun(t *testing.T) { cvLister := fakeClusterVersionLister(clusterVersion) instance := NewUpgradeable(cvLister) - err := instance.Run(context.TODO(), precondition.ReleaseContext{DesiredVersion: tc.desiredVersion}) + err := instance.Run(context.TODO(), precondition.ReleaseContext{DesiredVersion: tc.desiredVersion}, nil) switch { case err != nil && len(tc.expected) == 0: t.Error(err) diff --git a/pkg/payload/precondition/clusterversion/upgradeable.go b/pkg/payload/precondition/clusterversion/upgradeable.go index 588d1c685..faafb3243 100644 --- a/pkg/payload/precondition/clusterversion/upgradeable.go +++ b/pkg/payload/precondition/clusterversion/upgradeable.go @@ -31,7 +31,7 @@ func NewUpgradeable(lister configv1listers.ClusterVersionLister) *Upgradeable { // Run runs the Upgradeable precondition. // If the feature gate `key` is not found, or the api for clusterversion doesn't exist, this check is inert and always returns nil error. // Otherwise, if Upgradeable condition is set to false in the object, it returns an PreconditionError when possible. -func (pf *Upgradeable) Run(ctx context.Context, releaseContext precondition.ReleaseContext) error { +func (pf *Upgradeable) Run(ctx context.Context, releaseContext precondition.ReleaseContext, statusCond *configv1.ClusterOperatorStatusCondition) error { cv, err := pf.lister.Get(pf.key) if apierrors.IsNotFound(err) || meta.IsNoMatchError(err) { return nil @@ -64,17 +64,27 @@ func (pf *Upgradeable) Run(ctx context.Context, releaseContext precondition.Rele return nil } - // only perform minor version check if upgradeable is not false - //if up.Status != configv1.ConditionFalse { currentMinor := getEffectiveMinor(cv.Status.History[0].Version) desiredMinor := getEffectiveMinor(releaseContext.DesiredVersion) // if there is no difference in the minor version (4.y.z where 4.y is the same for current and desired), then we can still upgrade + // balh blah if currentMinor == desiredMinor { klog.V(4).Infof("Precondition %q passed: minor from the current %s matches minor from the target %s (both %s).", pf.Name(), cv.Status.History[0].Version, releaseContext.DesiredVersion, currentMinor) - return nil + + if statusCond != nil && statusCond.Reason == "ClusterVersionOverridesSet" { + klog.V(4).Infof("ClusterVersionOverridesSet. Z-level upgrade will not start.") + + return &precondition.Error{ + Nested: err, + Reason: statusCond.Reason, + Message: statusCond.Message, + Name: pf.Name(), + } + } else { + return nil + } } - //} return &precondition.Error{ Nested: err, diff --git a/pkg/payload/precondition/precondition.go b/pkg/payload/precondition/precondition.go index 258526192..a5787e822 100644 --- a/pkg/payload/precondition/precondition.go +++ b/pkg/payload/precondition/precondition.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + configv1 "github.com/openshift/api/config/v1" "k8s.io/klog" "github.com/openshift/cluster-version-operator/pkg/payload" @@ -41,7 +42,7 @@ type ReleaseContext struct { // Precondition defines the precondition check for a payload. type Precondition interface { // Run executes the precondition checks ands returns an error when the precondition fails. - Run(ctx context.Context, releaseContext ReleaseContext) error + Run(ctx context.Context, releaseContext ReleaseContext, statusCond *configv1.ClusterOperatorStatusCondition) error // Name returns a human friendly name for the precondition. Name() string @@ -52,11 +53,11 @@ type List []Precondition // RunAll runs all the reflight checks in order, returning a list of errors if any. // All checks are run, regardless if any one precondition fails. -func (pfList List) RunAll(ctx context.Context, releaseContext ReleaseContext) []error { +func (pfList List) RunAll(ctx context.Context, releaseContext ReleaseContext, statusCond *configv1.ClusterOperatorStatusCondition) []error { var errs []error for _, pf := range pfList { klog.Infof("Running precondition %q", pf.Name()) - if err := pf.Run(ctx, releaseContext); err != nil { + if err := pf.Run(ctx, releaseContext, statusCond); err != nil { klog.Errorf("Precondition %q failed: %v", pf.Name(), err) errs = append(errs, err) }