diff --git a/components/ws-daemon/pkg/controller/snapshot_controller.go b/components/ws-daemon/pkg/controller/snapshot_controller.go index 8dbdef09315f8b..52796ea56ed357 100644 --- a/components/ws-daemon/pkg/controller/snapshot_controller.go +++ b/components/ws-daemon/pkg/controller/snapshot_controller.go @@ -11,6 +11,7 @@ import ( "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -21,15 +22,17 @@ import ( // SnapshotReconciler reconciles a Snapshot object type SnapshotReconciler struct { client.Client - nodeName string - operations *WorkspaceOperations + maxConcurrentReconciles int + nodeName string + operations *WorkspaceOperations } -func NewSnapshotController(c client.Client, nodeName string, wso *WorkspaceOperations) *SnapshotReconciler { +func NewSnapshotController(c client.Client, nodeName string, maxConcurrentReconciles int, wso *WorkspaceOperations) *SnapshotReconciler { return &SnapshotReconciler{ - Client: c, - nodeName: nodeName, - operations: wso, + Client: c, + maxConcurrentReconciles: maxConcurrentReconciles, + nodeName: nodeName, + operations: wso, } } @@ -37,6 +40,9 @@ func NewSnapshotController(c client.Client, nodeName string, wso *WorkspaceOpera func (r *SnapshotReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named("snapshot"). + WithOptions(controller.Options{ + MaxConcurrentReconciles: r.maxConcurrentReconciles, + }). For(&workspacev1.Snapshot{}). WithEventFilter(snapshotEventFilter(r.nodeName)). Complete(r) diff --git a/components/ws-daemon/pkg/controller/workspace_controller.go b/components/ws-daemon/pkg/controller/workspace_controller.go index 5c3a9b2aaade95..36d07da388397a 100644 --- a/components/ws-daemon/pkg/controller/workspace_controller.go +++ b/components/ws-daemon/pkg/controller/workspace_controller.go @@ -26,6 +26,7 @@ import ( "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -49,20 +50,22 @@ type WorkspaceControllerOpts struct { type WorkspaceController struct { client.Client - NodeName string - operations *WorkspaceOperations - metrics *workspaceMetrics + NodeName string + maxConcurrentReconciles int + operations *WorkspaceOperations + metrics *workspaceMetrics } -func NewWorkspaceController(c client.Client, nodeName string, ops *WorkspaceOperations, reg prometheus.Registerer) (*WorkspaceController, error) { +func NewWorkspaceController(c client.Client, nodeName string, maxConcurrentReconciles int, ops *WorkspaceOperations, reg prometheus.Registerer) (*WorkspaceController, error) { metrics := newWorkspaceMetrics() reg.Register(metrics) return &WorkspaceController{ - Client: c, - NodeName: nodeName, - operations: ops, - metrics: metrics, + Client: c, + NodeName: nodeName, + maxConcurrentReconciles: maxConcurrentReconciles, + operations: ops, + metrics: metrics, }, nil } @@ -70,6 +73,9 @@ func NewWorkspaceController(c client.Client, nodeName string, ops *WorkspaceOper func (wsc *WorkspaceController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named("workspace"). + WithOptions(controller.Options{ + MaxConcurrentReconciles: wsc.maxConcurrentReconciles, + }). For(&workspacev1.Workspace{}). WithEventFilter(eventFilter(wsc.NodeName)). Complete(wsc) diff --git a/components/ws-daemon/pkg/daemon/config.go b/components/ws-daemon/pkg/daemon/config.go index ad7e94add0c9df..43e0bbcd31d4a0 100644 --- a/components/ws-daemon/pkg/daemon/config.go +++ b/components/ws-daemon/pkg/daemon/config.go @@ -33,8 +33,9 @@ type Config struct { } type WorkspaceControllerConfig struct { - Enabled bool `json:"enabled"` - WorkingAreaSuffix string `json:"workingAreaSuffix"` + Enabled bool `json:"enabled"` + WorkingAreaSuffix string `json:"workingAreaSuffix"` + MaxConcurrentReconciles int `json:"maxConcurrentReconciles,omitempty"` } type RuntimeConfig struct { diff --git a/components/ws-daemon/pkg/daemon/daemon.go b/components/ws-daemon/pkg/daemon/daemon.go index 7235f7594c338e..b582efda900e0a 100644 --- a/components/ws-daemon/pkg/daemon/daemon.go +++ b/components/ws-daemon/pkg/daemon/daemon.go @@ -207,7 +207,7 @@ func NewDaemon(config Config) (*Daemon, error) { return nil, err } - wsctrl, err := controller.NewWorkspaceController(mgr.GetClient(), nodename, workspaceOps, wrappedReg) + wsctrl, err := controller.NewWorkspaceController(mgr.GetClient(), nodename, config.WorkspaceController.MaxConcurrentReconciles, workspaceOps, wrappedReg) if err != nil { return nil, err } @@ -216,7 +216,7 @@ func NewDaemon(config Config) (*Daemon, error) { return nil, err } - ssctrl := controller.NewSnapshotController(mgr.GetClient(), nodename, workspaceOps) + ssctrl := controller.NewSnapshotController(mgr.GetClient(), nodename, config.WorkspaceController.MaxConcurrentReconciles, workspaceOps) err = ssctrl.SetupWithManager(mgr) if err != nil { return nil, err diff --git a/components/ws-manager-api/go/config/config.go b/components/ws-manager-api/go/config/config.go index 92366e5cc03bb8..3bf44eae9f9b18 100644 --- a/components/ws-manager-api/go/config/config.go +++ b/components/ws-manager-api/go/config/config.go @@ -125,6 +125,9 @@ type Configuration struct { WorkspaceClasses map[string]*WorkspaceClass `json:"workspaceClass"` // DebugWorkspacePod adds extra finalizer to workspace to prevent it from shutting down. Helps to debug. DebugWorkspacePod bool `json:"debugWorkspacePod,omitempty"` + // WorkspaceMaxConcurrentReconciles configures the max amount of concurrent workspace reconciliations on + // the workspace controller. + WorkspaceMaxConcurrentReconciles int `json:"workspaceMaxConcurrentReconciles,omitempty"` // TimeoutMaxConcurrentReconciles configures the max amount of concurrent workspace reconciliations on // the timeout controller. TimeoutMaxConcurrentReconciles int `json:"timeoutMaxConcurrentReconciles,omitempty"` diff --git a/components/ws-manager-mk2/controllers/status.go b/components/ws-manager-mk2/controllers/status.go index be3e2d0bb330bf..dbcfbeae27718f 100644 --- a/components/ws-manager-mk2/controllers/status.go +++ b/components/ws-manager-mk2/controllers/status.go @@ -191,7 +191,8 @@ func isDisposalFinished(ws *workspacev1.Workspace) bool { return wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionBackupComplete)) || wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionBackupFailure)) || wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionAborted)) || - wsk8s.ConditionWithStatusAndReason(ws.Status.Conditions, string(workspacev1.WorkspaceConditionContentReady), false, "InitializationFailure") + // Nothing to dispose if content wasn't ready. + !wsk8s.ConditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionContentReady)) } // extractFailure returns a pod failure reason and possibly a phase. If phase is nil then diff --git a/components/ws-manager-mk2/controllers/workspace_controller.go b/components/ws-manager-mk2/controllers/workspace_controller.go index c7be647155b1cd..fd6b34d1b6613e 100644 --- a/components/ws-manager-mk2/controllers/workspace_controller.go +++ b/components/ws-manager-mk2/controllers/workspace_controller.go @@ -15,6 +15,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" @@ -360,6 +361,9 @@ func (r *WorkspaceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named("workspace"). + WithOptions(controller.Options{ + MaxConcurrentReconciles: r.Config.WorkspaceMaxConcurrentReconciles, + }). For(&workspacev1.Workspace{}). Owns(&corev1.Pod{}). Complete(r) diff --git a/components/ws-manager-mk2/controllers/workspace_controller_test.go b/components/ws-manager-mk2/controllers/workspace_controller_test.go index 5ca6fda23a845e..48a63638ff4e15 100644 --- a/components/ws-manager-mk2/controllers/workspace_controller_test.go +++ b/components/ws-manager-mk2/controllers/workspace_controller_test.go @@ -119,11 +119,28 @@ var _ = Describe("WorkspaceController", func() { }) }) + It("should not take a backup if content init did not happen", func() { + ws := newWorkspace(uuid.NewString(), "default") + m := collectMetricCounts(wsMetrics, ws) + pod := createWorkspaceExpectPod(ws) + + requestStop(ws) + + // No content init, expect cleanup without backup. + expectWorkspaceCleanup(ws, pod) + + expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{ + stops: 1, + }) + }) + It("should handle backup failure", func() { ws := newWorkspace(uuid.NewString(), "default") m := collectMetricCounts(wsMetrics, ws) pod := createWorkspaceExpectPod(ws) + markContentReady(ws) + // Stop the workspace. requestStop(ws) @@ -134,6 +151,7 @@ var _ = Describe("WorkspaceController", func() { expectWorkspaceCleanup(ws, pod) expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{ + restores: 1, backups: 1, backupFailures: 1, stops: 1, @@ -145,6 +163,8 @@ var _ = Describe("WorkspaceController", func() { m := collectMetricCounts(wsMetrics, ws) pod := createWorkspaceExpectPod(ws) + markContentReady(ws) + // Update Pod with failed exit status. updateObjWithRetries(k8sClient, pod, true, func(pod *corev1.Pod) { pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{ @@ -165,6 +185,7 @@ var _ = Describe("WorkspaceController", func() { expectWorkspaceCleanup(ws, pod) expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{ + restores: 1, startFailures: 1, stops: 1, backups: 1, @@ -176,6 +197,8 @@ var _ = Describe("WorkspaceController", func() { m := collectMetricCounts(wsMetrics, ws) pod := createWorkspaceExpectPod(ws) + markContentReady(ws) + By("adding Timeout condition") updateObjWithRetries(k8sClient, ws, true, func(ws *workspacev1.Workspace) { ws.Status.Conditions = wsk8s.AddUniqueCondition(ws.Status.Conditions, metav1.Condition{ @@ -190,8 +213,9 @@ var _ = Describe("WorkspaceController", func() { expectWorkspaceCleanup(ws, pod) expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{ - stops: 1, - backups: 1, + restores: 1, + stops: 1, + backups: 1, }) }) @@ -200,6 +224,8 @@ var _ = Describe("WorkspaceController", func() { m := collectMetricCounts(wsMetrics, ws) pod := createWorkspaceExpectPod(ws) + markContentReady(ws) + // Update Pod with stop and abort conditions. updateObjWithRetries(k8sClient, ws, true, func(ws *workspacev1.Workspace) { ws.Status.Conditions = wsk8s.AddUniqueCondition(ws.Status.Conditions, metav1.Condition{ @@ -218,7 +244,8 @@ var _ = Describe("WorkspaceController", func() { expectWorkspaceCleanup(ws, pod) expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{ - stops: 1, + restores: 1, + stops: 1, }) }) @@ -227,6 +254,8 @@ var _ = Describe("WorkspaceController", func() { m := collectMetricCounts(wsMetrics, ws) pod := createWorkspaceExpectPod(ws) + markContentReady(ws) + Expect(k8sClient.Delete(ctx, ws)).To(Succeed()) expectPhaseEventually(ws, workspacev1.WorkspacePhaseStopping) @@ -236,8 +265,9 @@ var _ = Describe("WorkspaceController", func() { expectWorkspaceCleanup(ws, pod) expectMetricsDelta(m, collectMetricCounts(wsMetrics, ws), metricCounts{ - stops: 1, - backups: 1, + restores: 1, + stops: 1, + backups: 1, }) }) }) diff --git a/components/ws-manager-mk2/service/manager.go b/components/ws-manager-mk2/service/manager.go index eaf41d4bda0135..af71a909d90186 100644 --- a/components/ws-manager-mk2/service/manager.go +++ b/components/ws-manager-mk2/service/manager.go @@ -249,7 +249,7 @@ func (wsm *WorkspaceManagerServer) StartWorkspace(ctx context.Context, req *wsma } var wsr workspacev1.Workspace - err = wait.PollWithContext(ctx, 100*time.Millisecond, 5*time.Second, func(c context.Context) (done bool, err error) { + err = wait.PollWithContext(ctx, 100*time.Millisecond, 15*time.Second, func(c context.Context) (done bool, err error) { err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: ws.Name}, &wsr) if err != nil { return false, nil diff --git a/dev/loadgen/cmd/benchmark.go b/dev/loadgen/cmd/benchmark.go index 8b0e540b0b4fcf..f4ff348a0da3fa 100644 --- a/dev/loadgen/cmd/benchmark.go +++ b/dev/loadgen/cmd/benchmark.go @@ -55,7 +55,7 @@ var benchmarkCommand = &cobra.Command{ } var load loadgen.LoadGenerator - load = loadgen.NewFixedLoadGenerator(500*time.Millisecond, 300*time.Millisecond) + load = loadgen.NewFixedLoadGenerator(800*time.Millisecond, 300*time.Millisecond) load = loadgen.NewWorkspaceCountLimitingGenerator(load, scenario.Workspaces) template := &api.StartWorkspaceRequest{ diff --git a/dev/loadgen/pkg/loadgen/executor.go b/dev/loadgen/pkg/loadgen/executor.go index 0dfa815e751916..d342b9c6f3d3c5 100644 --- a/dev/loadgen/pkg/loadgen/executor.go +++ b/dev/loadgen/pkg/loadgen/executor.go @@ -222,9 +222,12 @@ func (w *WsmanExecutor) StopAll(ctx context.Context) error { if err != nil { log.Warnf("could not get workspaces: %v", err) } else { - if len(resp.GetStatus()) == 0 { + n := len(resp.GetStatus()) + if n == 0 { break } + ex := resp.GetStatus()[0] + log.Infof("%d workspaces remaining, e.g. %s", n, ex.Id) } select { diff --git a/install/installer/pkg/components/ws-daemon/configmap.go b/install/installer/pkg/components/ws-daemon/configmap.go index b0780ef361604b..21274eb01a2180 100644 --- a/install/installer/pkg/components/ws-daemon/configmap.go +++ b/install/installer/pkg/components/ws-daemon/configmap.go @@ -103,6 +103,7 @@ func configmap(ctx *common.RenderContext) ([]runtime.Object, error) { wscontroller.Enabled = ucfg.Workspace.UseWsmanagerMk2 wscontroller.WorkingAreaSuffix = "-mk2" + wscontroller.MaxConcurrentReconciles = 15 return nil }) diff --git a/install/installer/pkg/components/ws-manager-mk2/configmap.go b/install/installer/pkg/components/ws-manager-mk2/configmap.go index 5e895f16c5754f..f0604fd9c7bb26 100644 --- a/install/installer/pkg/components/ws-manager-mk2/configmap.go +++ b/install/installer/pkg/components/ws-manager-mk2/configmap.go @@ -221,10 +221,11 @@ func configmap(ctx *common.RenderContext) ([]runtime.Object, error) { Interrupted: util.Duration(5 * time.Minute), }, //EventTraceLog: "", // todo(sje): make conditional based on config - ReconnectionInterval: util.Duration(30 * time.Second), - RegistryFacadeHost: fmt.Sprintf("reg.%s:%d", ctx.Config.Domain, common.RegistryFacadeServicePort), - WorkspaceCACertSecret: customCASecret, - TimeoutMaxConcurrentReconciles: 5, + ReconnectionInterval: util.Duration(30 * time.Second), + RegistryFacadeHost: fmt.Sprintf("reg.%s:%d", ctx.Config.Domain, common.RegistryFacadeServicePort), + WorkspaceCACertSecret: customCASecret, + WorkspaceMaxConcurrentReconciles: 15, + TimeoutMaxConcurrentReconciles: 15, }, Content: struct { Storage storageconfig.StorageConfig `json:"storage"`