Skip to content

Commit

Permalink
new changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jottofar committed May 20, 2020
1 parent 03f89b2 commit b1a0f92
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 69 deletions.
2 changes: 1 addition & 1 deletion pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (optr *Operator) Run(ctx context.Context, workers int) {

// start the config sync loop, and have it notify the queue when new status is detected
go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(ctx, 16)
go optr.configSync.Start(ctx, 16, optr.checkForClusterVersionOverridesSet())
go wait.Until(func() { optr.worker(optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh)
go wait.Until(func() { optr.worker(optr.upgradeableQueue, optr.upgradeableSync) }, time.Second, stopCh)
go wait.Until(func() {
Expand Down
74 changes: 62 additions & 12 deletions pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,20 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
}

func TestCVO_StartupAndSync(t *testing.T) {

cond := &configv1.ClusterOperatorStatusCondition{
Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"),
Status: configv1.ConditionFalse,
}

o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

defer shutdownFn()
worker := o.configSync.(*SyncWorker)
go worker.Start(ctx, 1)
go worker.Start(ctx, 1, cond)

// Step 1: Verify the CVO creates the initial Cluster Version object
//
Expand Down Expand Up @@ -352,6 +358,11 @@ func TestCVO_StartupAndSync(t *testing.T) {
}

func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) {

cond := &configv1.ClusterOperatorStatusCondition{
Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"),
Status: configv1.ConditionFalse,
}
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest")

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -376,7 +387,7 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) {
VerificationError: payloadErr,
}

go worker.Start(ctx, 1)
go worker.Start(ctx, 1, cond)

// Step 1: Verify the CVO creates the initial Cluster Version object
//
Expand Down Expand Up @@ -623,6 +634,11 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) {
}

func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) {

cond := &configv1.ClusterOperatorStatusCondition{
Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"),
Status: configv1.ConditionFalse,
}
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest")

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -637,7 +653,7 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) {
Directory: "testdata/payloadtest",
Local: true,
}
go worker.Start(ctx, 1)
go worker.Start(ctx, 1, cond)

// Step 1: Verify the CVO creates the initial Cluster Version object
//
Expand Down Expand Up @@ -884,6 +900,11 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) {
}

func TestCVO_UpgradeUnverifiedPayload(t *testing.T) {

cond := &configv1.ClusterOperatorStatusCondition{
Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"),
Status: configv1.ConditionFalse,
}
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest-2")

// Setup: a successful sync from a previous run, and the operator at the same image as before
Expand Down Expand Up @@ -937,7 +958,7 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) {
retriever := worker.retriever.(*fakeDirectoryRetriever)
retriever.Set(PayloadInfo{}, payloadErr)

go worker.Start(ctx, 1)
go worker.Start(ctx, 1, cond)

// Step 1: The operator should report that it is blocked on unverified content
//
Expand Down Expand Up @@ -1109,6 +1130,11 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) {
}

func TestCVO_UpgradeUnverifiedPayloadRetriveOnce(t *testing.T) {

cond := &configv1.ClusterOperatorStatusCondition{
Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"),
Status: configv1.ConditionFalse,
}
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest-2")

// Setup: a successful sync from a previous run, and the operator at the same image as before
Expand Down Expand Up @@ -1162,7 +1188,7 @@ func TestCVO_UpgradeUnverifiedPayloadRetriveOnce(t *testing.T) {
retriever := worker.retriever.(*fakeDirectoryRetriever)
retriever.Set(PayloadInfo{}, payloadErr)

go worker.Start(ctx, 1)
go worker.Start(ctx, 1, cond)

// Step 1: The operator should report that it is blocked on unverified content
//
Expand Down Expand Up @@ -1371,6 +1397,11 @@ func TestCVO_UpgradeUnverifiedPayloadRetriveOnce(t *testing.T) {
}

func TestCVO_UpgradePreconditionFailing(t *testing.T) {

cond := &configv1.ClusterOperatorStatusCondition{
Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"),
Status: configv1.ConditionFalse,
}
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest-2")

// Setup: a successful sync from a previous run, and the operator at the same image as before
Expand Down Expand Up @@ -1414,7 +1445,7 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) {
worker := o.configSync.(*SyncWorker)
worker.preconditions = []precondition.Precondition{&testPrecondition{SuccessAfter: 3}}

go worker.Start(ctx, 1)
go worker.Start(ctx, 1, cond)

// Step 1: The operator should report that it is blocked on precondition checks failing
//
Expand Down Expand Up @@ -1595,6 +1626,11 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) {
}

func TestCVO_UpgradeVerifiedPayload(t *testing.T) {

cond := &configv1.ClusterOperatorStatusCondition{
Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"),
Status: configv1.ConditionFalse,
}
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest-2")

// Setup: a successful sync from a previous run, and the operator at the same image as before
Expand Down Expand Up @@ -1649,7 +1685,7 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) {
retriever := worker.retriever.(*fakeDirectoryRetriever)
retriever.Set(PayloadInfo{}, payloadErr)

go worker.Start(ctx, 1)
go worker.Start(ctx, 1, cond)

// Step 1: The operator should report that it is blocked on unverified content
//
Expand Down Expand Up @@ -1817,6 +1853,11 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) {
})
}
func TestCVO_RestartAndReconcile(t *testing.T) {

cond := &configv1.ClusterOperatorStatusCondition{
Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"),
Status: configv1.ConditionFalse,
}
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest")

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1885,7 +1926,7 @@ func TestCVO_RestartAndReconcile(t *testing.T) {
// Step 2: Start the sync worker and verify the sequence of events, and then verify
// the status does not change
//
go worker.Start(ctx, 1)
go worker.Start(ctx, 1, cond)
//
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Expand Down Expand Up @@ -2045,9 +2086,14 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) {
t.Fatalf("The worker should be reconciling: %v", worker.work)
}

cond := &configv1.ClusterOperatorStatusCondition{
Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"),
Status: configv1.ConditionFalse,
}

// Step 2: Start the sync worker and verify the sequence of events
//
go worker.Start(ctx, 1)
go worker.Start(ctx, 1, cond)
//
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Expand Down Expand Up @@ -2182,6 +2228,11 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) {
}

func TestCVO_ParallelError(t *testing.T) {

cond := &configv1.ClusterOperatorStatusCondition{
Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"),
Status: configv1.ConditionFalse,
}
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/paralleltest")

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -2251,7 +2302,7 @@ func TestCVO_ParallelError(t *testing.T) {
//
cancellable, cancel := context.WithCancel(ctx)
defer cancel()
go worker.Start(cancellable, 1)
go worker.Start(cancellable, 1, cond)
//
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Expand Down Expand Up @@ -2469,7 +2520,7 @@ func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWork
t.Helper()
if len(items) == 0 {
if len(ch) > 0 {
t.Fatalf("expected status to empty, got %#v", <-ch)
t.Fatalf("expected status to be empty, got %#v", <-ch)
}
return
}
Expand All @@ -2487,7 +2538,6 @@ func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWork
} else if !lastTime.IsZero() {
actual.LastProgress = time.Unix(count, 0)
}

if !reflect.DeepEqual(expect, actual) {
t.Fatalf("unexpected status item %d: %s", i, diff.ObjectReflectDiff(expect, actual))
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,8 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus {
return ch
}

func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int) {}
func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int, statusCond *configv1.ClusterOperatorStatusCondition) {
}

func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus {
r.Updates = append(r.Updates, desired)
Expand Down Expand Up @@ -453,7 +454,7 @@ func (pf *testPrecondition) Name() string {
return fmt.Sprintf("TestPrecondition SuccessAfter: %d", pf.SuccessAfter)
}

func (pf *testPrecondition) Run(_ context.Context, _ precondition.ReleaseContext) error {
func (pf *testPrecondition) Run(_ context.Context, _ precondition.ReleaseContext, statusCond *configv1.ClusterOperatorStatusCondition) error {
if pf.SuccessAfter == 0 {
return nil
}
Expand Down
60 changes: 29 additions & 31 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

// ConfigSyncWorker abstracts how the image is synchronized to the server. Introduced for testing.
type ConfigSyncWorker interface {
Start(ctx context.Context, maxWorkers int)
Start(ctx context.Context, maxWorkers int, statusCond *configv1.ClusterOperatorStatusCondition)
Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus
StatusCh() <-chan SyncWorkerStatus
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides
// Start periodically invokes run, detecting whether content has changed.
// It is edge-triggered when Update() is invoked and level-driven after the
// syncOnce() has succeeded for a given input (we are said to be "reconciling").
func (w *SyncWorker) Start(ctx context.Context, maxWorkers int) {
func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, statusCond *configv1.ClusterOperatorStatusCondition) {
klog.V(5).Infof("Starting sync worker")

work := &SyncWork{}
Expand Down Expand Up @@ -311,7 +311,7 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int) {
// so that we don't fail, then immediately start reporting an earlier status
reporter := &statusWrapper{w: w, previousStatus: w.Status()}
klog.V(5).Infof("Previous sync status: %#v", reporter.previousStatus)
return w.syncOnce(ctx, work, maxWorkers, reporter)
return w.syncOnce(ctx, work, maxWorkers, reporter, statusCond)
}()
if err != nil {
// backoff wait
Expand Down Expand Up @@ -467,7 +467,7 @@ func (w *SyncWorker) Status() *SyncWorkerStatus {
// sync retrieves the image and applies it to the server, returning an error if
// the update could not be completely applied. The status is updated as we progress.
// Cancelling the context will abort the execution of the sync.
func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter) error {
func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter, statusCond *configv1.ClusterOperatorStatusCondition) error {
klog.V(4).Infof("Running sync %s (force=%t) on generation %d in state %s at attempt %d", versionString(work.Desired), work.Desired.Force, work.Generation, work.State, work.Attempt)
update := work.Desired

Expand Down Expand Up @@ -527,7 +527,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
Actual: update,
Verified: info.Verified,
})
if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.ReleaseVersion})); err != nil {
if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.ReleaseVersion}, statusCond)); err != nil {
klog.Infof("precondition error %v", err)
if update.Force {
klog.V(4).Infof("Forcing past precondition failures: %s", err)
Expand Down Expand Up @@ -646,37 +646,35 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
})
}

/*
// update each object
errs := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error {
for _, task := range tasks {
if contextIsCancelled(ctx) {
return cr.CancelError()
}
cr.Update()
// update each object
errs := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error {
for _, task := range tasks {
if contextIsCancelled(ctx) {
return cr.CancelError()
}
cr.Update()

klog.V(4).Infof("Running sync for %s", task)
klog.V(5).Infof("Manifest: %s", string(task.Manifest.Raw))
klog.V(4).Infof("Running sync for %s", task)
klog.V(5).Infof("Manifest: %s", string(task.Manifest.Raw))

ov, ok := getOverrideForManifest(work.Overrides, w.exclude, task.Manifest)
if ok && ov.Unmanaged {
klog.V(4).Infof("Skipping %s as unmanaged", task)
continue
}
ov, ok := getOverrideForManifest(work.Overrides, w.exclude, task.Manifest)
if ok && ov.Unmanaged {
klog.V(4).Infof("Skipping %s as unmanaged", task)
continue
}

if err := task.Run(ctx, version, w.builder, work.State); err != nil {
return err
}
cr.Inc()
klog.V(4).Infof("Done syncing for %s", task)
if err := task.Run(ctx, version, w.builder, work.State); err != nil {
return err
}
return nil
})
if len(errs) > 0 {
err := cr.Errors(errs)
return err
cr.Inc()
klog.V(4).Infof("Done syncing for %s", task)
}
*/
return nil
})
if len(errs) > 0 {
err := cr.Errors(errs)
return err
}

// update the status
cr.Complete()
Expand Down
12 changes: 12 additions & 0 deletions pkg/cvo/upgradeable.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ import (
"github.com/openshift/cluster-version-operator/lib/resourcemerge"
)

func (optr *Operator) checkForClusterVersionOverridesSet() *configv1.ClusterOperatorStatusCondition {

for _, check := range optr.upgradeableChecks {
if cond := check.Check(); cond != nil {
if cond.Reason == "ClusterVersionOverridesSet" {
return cond
}
}
}
return nil
}

// syncUpgradeable. The status is only checked if it has been more than
// the minimumUpdateCheckInterval since the last check.
func (optr *Operator) syncUpgradeable(config *configv1.ClusterVersion) error {
Expand Down
Loading

0 comments on commit b1a0f92

Please sign in to comment.