diff --git a/components/ws-manager-api/go/config/config.go b/components/ws-manager-api/go/config/config.go index 1bfd514b9f2d7f..92366e5cc03bb8 100644 --- a/components/ws-manager-api/go/config/config.go +++ b/components/ws-manager-api/go/config/config.go @@ -125,6 +125,9 @@ type Configuration struct { WorkspaceClasses map[string]*WorkspaceClass `json:"workspaceClass"` // DebugWorkspacePod adds extra finalizer to workspace to prevent it from shutting down. Helps to debug. DebugWorkspacePod bool `json:"debugWorkspacePod,omitempty"` + // TimeoutMaxConcurrentReconciles configures the max amount of concurrent workspace reconciliations on + // the timeout controller. + TimeoutMaxConcurrentReconciles int `json:"timeoutMaxConcurrentReconciles,omitempty"` } type WorkspaceClass struct { diff --git a/components/ws-manager-api/go/crd/v1/workspace_types.go b/components/ws-manager-api/go/crd/v1/workspace_types.go index 06d4e1c0d3a7e2..cd5fc30f4daddb 100644 --- a/components/ws-manager-api/go/crd/v1/workspace_types.go +++ b/components/ws-manager-api/go/crd/v1/workspace_types.go @@ -145,7 +145,10 @@ const ( WorkspaceConditionTimeout WorkspaceCondition = "Timeout" // UserActivity is the time when MarkActive was first called on the workspace - WorkspaceConditionUserActivity WorkspaceCondition = "UserActivity" + WorkspaceConditionFirstUserActivity WorkspaceCondition = "FirstUserActivity" + + // Closed indicates that a workspace is marked as closed. This will shorten its timeout. + WorkspaceConditionClosed WorkspaceCondition = "Closed" // HeadlessTaskFailed indicates that a headless workspace task failed WorkspaceConditionsHeadlessTaskFailed WorkspaceCondition = "HeadlessTaskFailed" diff --git a/components/ws-manager-mk2/controllers/status.go b/components/ws-manager-mk2/controllers/status.go index 456210b7de3c90..3593e260880658 100644 --- a/components/ws-manager-mk2/controllers/status.go +++ b/components/ws-manager-mk2/controllers/status.go @@ -287,3 +287,8 @@ func isPodBeingDeleted(pod *corev1.Pod) bool { // if the pod is being deleted the only marker we have is that the deletionTimestamp is set return pod.ObjectMeta.DeletionTimestamp != nil } + +// isWorkspaceBeingDeleted returns true if the workspace resource is currently being deleted. +func isWorkspaceBeingDeleted(ws *workspacev1.Workspace) bool { + return ws.ObjectMeta.DeletionTimestamp != nil +} diff --git a/components/ws-manager-mk2/controllers/suite_test.go b/components/ws-manager-mk2/controllers/suite_test.go index 87cd63a89b439a..a520a641d8dc3e 100644 --- a/components/ws-manager-mk2/controllers/suite_test.go +++ b/components/ws-manager-mk2/controllers/suite_test.go @@ -8,6 +8,7 @@ import ( "context" "path/filepath" "testing" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -19,6 +20,8 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "github.com/gitpod-io/gitpod/common-go/util" + "github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity" "github.com/gitpod-io/gitpod/ws-manager/api/config" workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1" //+kubebuilder:scaffold:imports @@ -103,6 +106,25 @@ var _ = BeforeSuite(func() { }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) + activity := activity.WorkspaceActivity{} + timeoutReconciler, err := NewTimeoutReconciler(k8sManager.GetClient(), config.Configuration{ + Timeouts: config.WorkspaceTimeoutConfiguration{ + TotalStartup: util.Duration(1 * time.Minute), + Initialization: util.Duration(1 * time.Minute), + RegularWorkspace: util.Duration(1 * time.Minute), + MaxLifetime: util.Duration(1 * time.Minute), + HeadlessWorkspace: util.Duration(1 * time.Minute), + AfterClose: util.Duration(1 * time.Minute), + ContentFinalization: util.Duration(1 * time.Minute), + Stopping: util.Duration(1 * time.Minute), + Interrupted: util.Duration(1 * time.Minute), + }, + TimeoutMaxConcurrentReconciles: 2, + }, &activity) + Expect(err).ToNot(HaveOccurred()) + err = timeoutReconciler.SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) + ctx, cancel = context.WithCancel(context.TODO()) go func() { diff --git a/components/ws-manager-mk2/controllers/timeout_controller.go b/components/ws-manager-mk2/controllers/timeout_controller.go new file mode 100644 index 00000000000000..fb86384cf55e86 --- /dev/null +++ b/components/ws-manager-mk2/controllers/timeout_controller.go @@ -0,0 +1,240 @@ +// Copyright (c) 2022 Gitpod GmbH. All rights reserved. +// Licensed under the GNU Affero General Public License (AGPL). +// See License-AGPL.txt in the project root for license information. + +package controllers + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/gitpod-io/gitpod/common-go/util" + wsactivity "github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity" + config "github.com/gitpod-io/gitpod/ws-manager/api/config" + workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1" +) + +func NewTimeoutReconciler(c client.Client, cfg config.Configuration, activity *wsactivity.WorkspaceActivity) (*TimeoutReconciler, error) { + reconcileInterval := time.Duration(cfg.HeartbeatInterval) + // Reconcile interval is half the heartbeat interval to catch timed out workspaces in time. + // See https://en.wikipedia.org/wiki/Nyquist%E2%80%93Shannon_sampling_theorem why we need this. + reconcileInterval /= 2 + + return &TimeoutReconciler{ + Client: c, + Config: cfg, + activity: activity, + reconcileInterval: reconcileInterval, + ctrlStartTime: time.Now().UTC(), + }, nil +} + +// TimeoutReconciler reconciles workspace timeouts. This is a separate reconciler, as it +// always requeues events for existing workspaces such that timeouts are checked on (at least) +// a specified interval. The reconcile loop should therefore be light-weight as it's repeatedly +// reconciling all workspaces in the cluster. +type TimeoutReconciler struct { + client.Client + + Config config.Configuration + activity *wsactivity.WorkspaceActivity + reconcileInterval time.Duration + ctrlStartTime time.Time +} + +//+kubebuilder:rbac:groups=workspace.gitpod.io,resources=workspaces,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=workspace.gitpod.io,resources=workspaces/status,verbs=get;update;patch + +// Reconcile will check the given workspace for timing out. When done, a new event gets +// requeued automatically to ensure the workspace gets reconciled at least every reconcileInterval. +func (r *TimeoutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { + log := log.FromContext(ctx).WithValues("ws", req.NamespacedName) + + var workspace workspacev1.Workspace + if err := r.Get(ctx, req.NamespacedName, &workspace); err != nil { + if !apierrors.IsNotFound(err) { + log.Error(err, "unable to fetch workspace") + } + // We'll ignore not-found errors, since they can't be fixed by an immediate + // requeue (we'll need to wait for a new notification), and we can get them + // on deleted requests. + // On any other error, let the controller requeue an event with exponential + // backoff. + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + if conditionPresentAndTrue(workspace.Status.Conditions, string(workspacev1.WorkspaceConditionTimeout)) { + // Workspace has already been marked as timed out. + // Return and don't requeue another reconciliation. + return ctrl.Result{}, nil + } + + // The workspace hasn't timed out yet. After this point, we always + // want to requeue a reconciliation after the configured interval. + defer func() { + result.RequeueAfter = r.reconcileInterval + }() + + timedout, err := r.isWorkspaceTimedOut(&workspace) + if err != nil { + log.Error(err, "failed to check for workspace timeout") + return ctrl.Result{}, err + } + + if timedout == "" { + // Hasn't timed out. + return ctrl.Result{}, nil + } + + // Workspace timed out, set Timeout condition. + log.Info("Workspace timed out", "reason", timedout) + workspace.Status.Conditions = AddUniqueCondition(workspace.Status.Conditions, metav1.Condition{ + Type: string(workspacev1.WorkspaceConditionTimeout), + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "TimedOut", + Message: timedout, + }) + + if err = r.Client.Status().Update(ctx, &workspace); err != nil { + log.Error(err, "Failed to update workspace status with Timeout condition") + return ctrl.Result{}, err + } + return ctrl.Result{}, nil +} + +type timeoutActivity string + +const ( + activityInit timeoutActivity = "initialization" + activityStartup timeoutActivity = "startup" + activityCreatingContainers timeoutActivity = "creating containers" + activityPullingImages timeoutActivity = "pulling images" + activityRunningHeadless timeoutActivity = "running the headless workspace" + activityNone timeoutActivity = "period of inactivity" + activityMaxLifetime timeoutActivity = "maximum lifetime" + activityClosed timeoutActivity = "after being closed" + activityInterrupted timeoutActivity = "workspace interruption" + activityStopping timeoutActivity = "stopping" + activityBackup timeoutActivity = "backup" +) + +// isWorkspaceTimedOut determines if a workspace is timed out based on the manager configuration and state the pod is in. +// This function does NOT use the Timeout condition, but rather is used to set that condition in the first place. +func (r *TimeoutReconciler) isWorkspaceTimedOut(ws *workspacev1.Workspace) (reason string, err error) { + timeouts := r.Config.Timeouts + phase := ws.Status.Phase + + decide := func(start time.Time, timeout util.Duration, activity timeoutActivity) (string, error) { + td := time.Duration(timeout) + inactivity := time.Since(start) + if inactivity < td { + return "", nil + } + + return fmt.Sprintf("workspace timed out after %s (%s) took longer than %s", activity, formatDuration(inactivity), formatDuration(td)), nil + } + + start := ws.ObjectMeta.CreationTimestamp.Time + lastActivity := r.activity.GetLastActivity(ws.Name) + isClosed := conditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionClosed)) + + switch phase { + case workspacev1.WorkspacePhasePending: + return decide(start, timeouts.Initialization, activityInit) + + case workspacev1.WorkspacePhaseInitializing: + return decide(start, timeouts.TotalStartup, activityStartup) + + case workspacev1.WorkspacePhaseCreating: + activity := activityCreatingContainers + // TODO: + // if status.Conditions.PullingImages == api.WorkspaceConditionBool_TRUE { + // activity = activityPullingImages + // } + return decide(start, timeouts.TotalStartup, activity) + + case workspacev1.WorkspacePhaseRunning: + // First check is always for the max lifetime + if msg, err := decide(start, timeouts.MaxLifetime, activityMaxLifetime); msg != "" { + return msg, err + } + + timeout := timeouts.RegularWorkspace + if ctv := ws.Spec.Timeout.Time; ctv != nil { + timeout = util.Duration(ctv.Duration) + } + activity := activityNone + if ws.Status.Headless { + timeout = timeouts.HeadlessWorkspace + lastActivity = &start + activity = activityRunningHeadless + } else if lastActivity == nil { + // The workspace is up and running, but the user has never produced any activity, OR the controller + // has restarted and not yet received a heartbeat for this workspace (since heartbeats are stored + // in-memory and reset on restart). + // First check whether the controller has restarted during this workspace's lifetime. + // If it has, use the FirstUserActivity condition to determine whether there had already been any user activity + // before the controller restart. + // If the controller started before the workspace, then the user hasn't produced any activity yet. + if r.ctrlStartTime.After(start) && conditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionFirstUserActivity)) { + // The controller restarted during this workspace's lifetime, and the workspace has had activity before the restart, + // so the last activity has been lost on restart. Therefore, "reset" the timeout and measure only since the controller startup time. + start = r.ctrlStartTime + } else { + // This workspace hasn't had any user activity yet (also not before a potential controller restart). + // So check for a startup timeout, and measure since workspace creation time. + timeout = timeouts.TotalStartup + } + return decide(start, timeout, activityNone) + } else if isClosed { + return decide(*lastActivity, timeouts.AfterClose, activityClosed) + } + return decide(*lastActivity, timeout, activity) + + case workspacev1.WorkspacePhaseStopping: + if isWorkspaceBeingDeleted(ws) && conditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionBackupComplete)) { + // Beware: we apply the ContentFinalization timeout only to workspaces which are currently being deleted. + // We basically don't expect a workspace to be in content finalization before it's been deleted. + return decide(ws.DeletionTimestamp.Time, timeouts.ContentFinalization, activityBackup) + } else if !isWorkspaceBeingDeleted(ws) { + // workspaces that have not been deleted have never timed out + return "", nil + } else { + return decide(ws.DeletionTimestamp.Time, timeouts.Stopping, activityStopping) + } + + default: + // The only other phases we can be in is stopped which is pointless to time out + return "", nil + } +} + +func formatDuration(d time.Duration) string { + d = d.Round(time.Minute) + h := d / time.Hour + d -= h * time.Hour + m := d / time.Minute + return fmt.Sprintf("%02dh%02dm", h, m) +} + +// SetupWithManager sets up the controller with the Manager. +func (r *TimeoutReconciler) SetupWithManager(mgr ctrl.Manager) error { + maxConcurrentReconciles := r.Config.TimeoutMaxConcurrentReconciles + if maxConcurrentReconciles <= 0 { + maxConcurrentReconciles = 1 + } + + return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}). + For(&workspacev1.Workspace{}). + Complete(r) +} diff --git a/components/ws-manager-mk2/controllers/timeout_controller_test.go b/components/ws-manager-mk2/controllers/timeout_controller_test.go new file mode 100644 index 00000000000000..5a61faa2b48ab8 --- /dev/null +++ b/components/ws-manager-mk2/controllers/timeout_controller_test.go @@ -0,0 +1,126 @@ +// Copyright (c) 2022 Gitpod GmbH. All rights reserved. +// Licensed under the GNU Affero General Public License (AGPL). +// See License-AGPL.txt in the project root for license information. + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/aws/smithy-go/ptr" + workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + // . "github.com/onsi/ginkgo/extensions/table" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("TimeoutController", func() { + Context("When workspace is active", func() { + It("Should not time out on controller (re)start", func() { + const ( + WorkspaceName = "test-workspace" + WorkspaceNamespace = "default" + + timeout = time.Second * 10 + duration = time.Second * 2 + interval = time.Millisecond * 250 + ) + + By("creating a status") + + ctx := context.Background() + workspace := &workspacev1.Workspace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "workspace.gitpod.io/v1", + Kind: "Workspace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: WorkspaceName, + Namespace: WorkspaceNamespace, + }, + Spec: workspacev1.WorkspaceSpec{ + Ownership: workspacev1.Ownership{ + Owner: "foobar", + WorkspaceID: "cool-workspace", + }, + Type: workspacev1.WorkspaceTypeRegular, + Class: "default", + Image: workspacev1.WorkspaceImages{ + Workspace: workspacev1.WorkspaceImage{ + Ref: ptr.String("alpine:latest"), + }, + IDE: workspacev1.IDEImages{ + Refs: []string{}, + }, + }, + Ports: []workspacev1.PortSpec{}, + Initializer: []byte("abc"), + Admission: workspacev1.AdmissionSpec{ + Level: workspacev1.AdmissionLevelEveryone, + }, + }, + } + Expect(k8sClient.Create(ctx, workspace)).Should(Succeed()) + + By("creating a pod") + podLookupKey := types.NamespacedName{Name: "ws-" + WorkspaceName, Namespace: WorkspaceNamespace} + createdPod := &corev1.Pod{} + + // We'll need to retry getting this newly created CronJob, given that creation may not immediately happen. + Eventually(func() bool { + err := k8sClient.Get(ctx, podLookupKey, createdPod) + return err == nil + }, timeout, interval).Should(BeTrue()) + + By("updating the pod starts value") + createdWS := &workspacev1.Workspace{} + Eventually(func() (int, error) { + err := k8sClient.Get(ctx, types.NamespacedName{Name: WorkspaceName, Namespace: WorkspaceNamespace}, createdWS) + if err != nil { + return 0, err + } + + return createdWS.Status.PodStarts, nil + }, timeout, interval).Should(Equal(1)) + + By("Creating the pod only once") + // TODO(cw): remove this hell of a hack once we're on PVC and no longer need to rely on concurrent init processes. + // Here we assume that the controller will have deleted the pod because it failed, + // and we removed the finalizer. + createdPod.Finalizers = []string{} + Expect(k8sClient.Update(ctx, createdPod)).To(Succeed()) + Expect(k8sClient.Delete(ctx, createdPod)).To(Succeed()) + Eventually(func() bool { + err := k8sClient.Get(ctx, podLookupKey, createdPod) + if err != nil { + // TODO(cw): check if this is a not found error + // We have an error and assume we did not find the pod. This is what we want. + return true + } + + fmt.Println(createdPod.ResourceVersion) + return false + }, timeout, interval).Should(BeTrue(), "pod did not go away") + + // Now we make sure the pod doesn't come back + Consistently(func() bool { + err := k8sClient.Get(ctx, podLookupKey, createdPod) + if err != nil { + // TODO(cw): check if this is a not found error + // We have an error and assume we did not find the pod. This is what we want. + return true + } + + fmt.Println(createdPod.ResourceVersion) + return false + }, duration, interval).Should(BeTrue(), "pod came back") + }) + }) +}) diff --git a/components/ws-manager-mk2/controllers/workspace_controller.go b/components/ws-manager-mk2/controllers/workspace_controller.go index 1f493156d86037..e866eeee3e0f52 100644 --- a/components/ws-manager-mk2/controllers/workspace_controller.go +++ b/components/ws-manager-mk2/controllers/workspace_controller.go @@ -75,7 +75,9 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( var workspace workspacev1.Workspace if err := r.Get(ctx, req.NamespacedName, &workspace); err != nil { - log.Error(err, "unable to fetch workspace") + if !errors.IsNotFound(err) { + log.Error(err, "unable to fetch workspace") + } // we'll ignore not-found errors, since they can't be fixed by an immediate // requeue (we'll need to wait for a new notification), and we can get them // on deleted requests. @@ -191,6 +193,15 @@ func (r *WorkspaceReconciler) actOnStatus(ctx context.Context, workspace *worksp return ctrl.Result{Requeue: true}, err } + // if the workspace timed out, delete it + case conditionPresentAndTrue(workspace.Status.Conditions, string(workspacev1.WorkspaceConditionTimeout)) && !isPodBeingDeleted(pod): + err := r.Client.Delete(ctx, pod) + if errors.IsNotFound(err) { + // pod is gone - nothing to do here + } else { + return ctrl.Result{Requeue: true}, err + } + // if the content initialization failed, delete the pod case conditionWithStatusAndReson(workspace.Status.Conditions, string(workspacev1.WorkspaceConditionContentReady), false, "InitializationFailure") && !isPodBeingDeleted(pod): err := r.Client.Delete(ctx, pod) diff --git a/components/ws-manager-mk2/main.go b/components/ws-manager-mk2/main.go index c27f0db94622d1..56d8650162addc 100644 --- a/components/ws-manager-mk2/main.go +++ b/components/ws-manager-mk2/main.go @@ -39,6 +39,7 @@ import ( "github.com/gitpod-io/gitpod/common-go/pprof" regapi "github.com/gitpod-io/gitpod/registry-facade/api" "github.com/gitpod-io/gitpod/ws-manager-mk2/controllers" + "github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity" "github.com/gitpod-io/gitpod/ws-manager-mk2/service" wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api" config "github.com/gitpod-io/gitpod/ws-manager/api/config" @@ -102,8 +103,14 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "Workspace") os.Exit(1) } + activity := &activity.WorkspaceActivity{} + timeoutReconciler, err := controllers.NewTimeoutReconciler(mgr.GetClient(), cfg.Manager, activity) + if err != nil { + setupLog.Error(err, "unable to create timeout controller", "controller", "Timeout") + os.Exit(1) + } - wsmanService, err := setupGRPCService(cfg, mgr.GetClient()) + wsmanService, err := setupGRPCService(cfg, mgr.GetClient(), activity) if err != nil { setupLog.Error(err, "unable to start manager service") os.Exit(1) @@ -111,7 +118,11 @@ func main() { reconciler.OnReconcile = wsmanService.OnWorkspaceReconcile if err = reconciler.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "Workspace") + setupLog.Error(err, "unable to set up workspace controller with manager", "controller", "Workspace") + os.Exit(1) + } + if err = timeoutReconciler.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to set up timeout controller with manager", "controller", "Timeout") os.Exit(1) } @@ -137,7 +148,7 @@ func main() { } } -func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client) (*service.WorkspaceManagerServer, error) { +func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client, activity *activity.WorkspaceActivity) (*service.WorkspaceManagerServer, error) { // TODO(cw): remove use of common-go/log if len(cfg.RPCServer.RateLimits) > 0 { @@ -170,7 +181,7 @@ func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client) (*ser grpcOpts = append(grpcOpts, grpc.UnknownServiceHandler(proxy.TransparentHandler(imagebuilderDirector(cfg.ImageBuilderProxy.TargetAddr)))) - srv := service.NewWorkspaceManagerServer(k8s, &cfg.Manager, metrics.Registry) + srv := service.NewWorkspaceManagerServer(k8s, &cfg.Manager, metrics.Registry, activity) grpcServer := grpc.NewServer(grpcOpts...) grpc_prometheus.Register(grpcServer) diff --git a/components/ws-manager-mk2/pkg/activity/activity.go b/components/ws-manager-mk2/pkg/activity/activity.go new file mode 100644 index 00000000000000..b52ef9b27ecd6b --- /dev/null +++ b/components/ws-manager-mk2/pkg/activity/activity.go @@ -0,0 +1,29 @@ +// Copyright (c) 2023 Gitpod GmbH. All rights reserved. +// Licensed under the GNU Affero General Public License (AGPL). +// See License.AGPL.txt in the project root for license information. + +package activity + +import ( + "sync" + "time" +) + +// WorkspaceActivity is used to track the last user activity per workspace. This is +// stored in memory instead of on the Workspace resource to limit load on the k8s API, +// as this value will update often for each workspace. +type WorkspaceActivity struct { + m sync.Map +} + +func (w *WorkspaceActivity) Store(workspaceId string, lastActivity time.Time) { + w.m.Store(workspaceId, &lastActivity) +} + +func (w *WorkspaceActivity) GetLastActivity(workspaceId string) *time.Time { + lastActivity, ok := w.m.Load(workspaceId) + if ok { + return lastActivity.(*time.Time) + } + return nil +} diff --git a/components/ws-manager-mk2/service/manager.go b/components/ws-manager-mk2/service/manager.go index 482dec10d030cd..daa310db6f5639 100644 --- a/components/ws-manager-mk2/service/manager.go +++ b/components/ws-manager-mk2/service/manager.go @@ -24,6 +24,7 @@ import ( wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes" "github.com/gitpod-io/gitpod/common-go/log" "github.com/gitpod-io/gitpod/common-go/tracing" + "github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity" "github.com/gitpod-io/gitpod/ws-manager/api" wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api" "github.com/gitpod-io/gitpod/ws-manager/api/config" @@ -41,14 +42,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func NewWorkspaceManagerServer(clnt client.Client, cfg *config.Configuration, reg prometheus.Registerer) *WorkspaceManagerServer { +func NewWorkspaceManagerServer(clnt client.Client, cfg *config.Configuration, reg prometheus.Registerer, activity *activity.WorkspaceActivity) *WorkspaceManagerServer { metrics := newWorkspaceMetrics() reg.MustRegister(metrics) return &WorkspaceManagerServer{ - Client: clnt, - Config: cfg, - metrics: metrics, + Client: clnt, + Config: cfg, + metrics: metrics, + activity: activity, subs: subscriptions{ subscribers: make(map[string]chan *wsmanapi.SubscribeResponse), }, @@ -56,9 +58,10 @@ func NewWorkspaceManagerServer(clnt client.Client, cfg *config.Configuration, re } type WorkspaceManagerServer struct { - Client client.Client - Config *config.Configuration - metrics *workspaceMetrics + Client client.Client + Config *config.Configuration + metrics *workspaceMetrics + activity *activity.WorkspaceActivity subs subscriptions wsmanapi.UnimplementedWorkspaceManagerServer @@ -280,10 +283,15 @@ func (wsm *WorkspaceManagerServer) DescribeWorkspace(ctx context.Context, req *w return nil, status.Errorf(codes.Internal, "cannot lookup workspace: %v", err) } - return &wsmanapi.DescribeWorkspaceResponse{ + result := &wsmanapi.DescribeWorkspaceResponse{ Status: extractWorkspaceStatus(&ws), - // TODO(cw): Add lastActivity - }, nil + } + + lastActivity := wsm.activity.GetLastActivity(req.Id) + if lastActivity != nil { + result.LastActivity = lastActivity.UTC().Format(time.RFC3339Nano) + } + return result, nil } // Subscribe streams all status updates to a client @@ -296,8 +304,87 @@ func (m *WorkspaceManagerServer) Subscribe(req *api.SubscribeRequest, srv api.Wo return m.subs.Subscribe(srv.Context(), sub) } -func (wsm *WorkspaceManagerServer) MarkActive(ctx context.Context, req *wsmanapi.MarkActiveRequest) (*wsmanapi.MarkActiveResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method MarkActive not implemented") +// MarkActive records a workspace as being active which prevents it from timing out +func (wsm *WorkspaceManagerServer) MarkActive(ctx context.Context, req *wsmanapi.MarkActiveRequest) (res *wsmanapi.MarkActiveResponse, err error) { + //nolint:ineffassign + span, ctx := tracing.FromContext(ctx, "MarkActive") + tracing.ApplyOWI(span, log.OWI("", "", req.Id)) + defer tracing.FinishSpan(span, &err) + + workspaceID := req.Id + + var ws workspacev1.Workspace + err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: req.Id}, &ws) + if errors.IsNotFound(err) { + return nil, status.Errorf(codes.NotFound, "workspace %s does not exist", req.Id) + } + if err != nil { + return nil, status.Errorf(codes.Internal, "cannot mark workspace: %v", err) + } + + var firstUserActivity *timestamppb.Timestamp + for _, c := range ws.Status.Conditions { + if c.Type == string(workspacev1.WorkspaceConditionFirstUserActivity) { + firstUserActivity = timestamppb.New(c.LastTransitionTime.Time) + } + } + + // if user already mark workspace as active and this request has IgnoreIfActive flag, just simple ignore it + if firstUserActivity != nil && req.IgnoreIfActive { + return &api.MarkActiveResponse{}, nil + } + + // We do not keep the last activity in the workspace resource to limit the load we're placing + // on the K8S master in check. Thus, this state lives locally in a map. + now := time.Now().UTC() + wsm.activity.Store(req.Id, now) + + // We do however maintain the the "closed" flag as annotation on the workspace. This flag should not change + // very often and provides a better UX if it persists across ws-manager restarts. + isMarkedClosed := conditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionClosed)) + if req.Closed && !isMarkedClosed { + err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error { + ws.Status.Conditions = addUniqueCondition(ws.Status.Conditions, metav1.Condition{ + Type: string(workspacev1.WorkspaceConditionClosed), + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(now), + Reason: "MarkActiveRequest", + }) + return nil + }) + } else if !req.Closed && isMarkedClosed { + err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error { + ws.Status.Conditions = addUniqueCondition(ws.Status.Conditions, metav1.Condition{ + Type: string(workspacev1.WorkspaceConditionClosed), + Status: metav1.ConditionFalse, + LastTransitionTime: metav1.NewTime(now), + Reason: "MarkActiveRequest", + }) + return nil + }) + } + if err != nil { + log.WithError(err).WithFields(log.OWI("", "", workspaceID)).Warn("was unable to mark workspace properly") + } + + // If it's the first call: Mark the pod with FirstUserActivity condition. + if firstUserActivity == nil { + err := wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error { + ws.Status.Conditions = addUniqueCondition(ws.Status.Conditions, metav1.Condition{ + Type: string(workspacev1.WorkspaceConditionFirstUserActivity), + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(now), + Reason: "FirstActivity", + }) + return nil + }) + if err != nil { + log.WithError(err).WithFields(log.OWI("", "", workspaceID)).Warn("was unable to set FirstUserActivity condition on workspace") + return nil, err + } + } + + return &api.MarkActiveResponse{}, nil } func (wsm *WorkspaceManagerServer) SetTimeout(ctx context.Context, req *wsmanapi.SetTimeoutRequest) (*wsmanapi.SetTimeoutResponse, error) { @@ -501,7 +588,7 @@ func extractWorkspaceStatus(ws *workspacev1.Workspace) *wsmanapi.WorkspaceStatus var timeout string if ws.Spec.Timeout.Time != nil { - timeout = ws.Spec.Timeout.Time.String() + timeout = ws.Spec.Timeout.Time.Duration.String() } var phase wsmanapi.WorkspacePhase @@ -527,7 +614,7 @@ func extractWorkspaceStatus(ws *workspacev1.Workspace) *wsmanapi.WorkspaceStatus var firstUserActivity *timestamppb.Timestamp for _, c := range ws.Status.Conditions { - if c.Type == string(workspacev1.WorkspaceConditionUserActivity) { + if c.Type == string(workspacev1.WorkspaceConditionFirstUserActivity) { firstUserActivity = timestamppb.New(c.LastTransitionTime.Time) } } @@ -878,3 +965,23 @@ func (m *workspaceMetrics) Describe(ch chan<- *prometheus.Desc) { func (m *workspaceMetrics) Collect(ch chan<- prometheus.Metric) { m.totalStartsCounterVec.Collect(ch) } + +func addUniqueCondition(conds []metav1.Condition, cond metav1.Condition) []metav1.Condition { + for i, c := range conds { + if c.Type == cond.Type { + conds[i] = cond + return conds + } + } + + return append(conds, cond) +} + +func conditionPresentAndTrue(cond []metav1.Condition, tpe string) bool { + for _, c := range cond { + if c.Type == tpe { + return c.Status == metav1.ConditionTrue + } + } + return false +} diff --git a/components/ws-manager-mk2/ws-manager-mk2.code-workspace b/components/ws-manager-mk2/ws-manager-mk2.code-workspace new file mode 100644 index 00000000000000..b0a6157bfdb05d --- /dev/null +++ b/components/ws-manager-mk2/ws-manager-mk2.code-workspace @@ -0,0 +1,51 @@ +{ + "folders": [ + { "path": "../common-go" }, + { "path": "../ws-manager" }, + { "path": "../ws-manager-api" }, + { "path": "../ws-manager-mk2" }, + { "path": "../server" }, + { "path": "../../test" }, + { "path": "../../dev/gpctl" }, + { "path": "../../install/installer" } + ], + "settings": { + "typescript.tsdk": "gitpod/node_modules/typescript/lib", + "[json]": { + "editor.insertSpaces": true, + "editor.tabSize": 2 + }, + "[yaml]": { + "editor.insertSpaces": true, + "editor.tabSize": 2 + }, + "[go]": { + "editor.formatOnSave": true + }, + "[tf]": { + "editor.insertSpaces": true, + "editor.tabSize": 2 + }, + "go.formatTool": "goimports", + "go.useLanguageServer": true, + "workspace.supportMultiRootWorkspace": true, + "database.connections": [ + { + "type": "mysql", + "name": "devstaging DB", + "host": "127.0.0.1:23306", + "username": "gitpod", + "database": "gitpod", + "password": "test" + } + ], + "launch": {}, + "files.exclude": { + "**/.git": true + }, + "go.lintTool": "golangci-lint", + "gopls": { + "allowModfileModifications": true + } + } +} diff --git a/install/installer/pkg/components/ws-manager/configmap.go b/install/installer/pkg/components/ws-manager/configmap.go index 46ea967e38cd72..45476adf5c2e0c 100644 --- a/install/installer/pkg/components/ws-manager/configmap.go +++ b/install/installer/pkg/components/ws-manager/configmap.go @@ -222,9 +222,10 @@ func configmap(ctx *common.RenderContext) ([]runtime.Object, error) { Interrupted: util.Duration(5 * time.Minute), }, //EventTraceLog: "", // todo(sje): make conditional based on config - ReconnectionInterval: util.Duration(30 * time.Second), - RegistryFacadeHost: fmt.Sprintf("reg.%s:%d", ctx.Config.Domain, common.RegistryFacadeServicePort), - WorkspaceCACertSecret: customCASecret, + ReconnectionInterval: util.Duration(30 * time.Second), + RegistryFacadeHost: fmt.Sprintf("reg.%s:%d", ctx.Config.Domain, common.RegistryFacadeServicePort), + WorkspaceCACertSecret: customCASecret, + TimeoutMaxConcurrentReconciles: 5, }, Content: struct { Storage storageconfig.StorageConfig `json:"storage"`