Skip to content

[ws-manager-mk2] Implement MarkActive #16181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions components/ws-manager-api/go/crd/v1/workspace_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type WorkspaceStatus struct {
Runtime *WorkspaceRuntimeStatus `json:"runtime,omitempty"`
}

// +kubebuilder:validation:Enum=Deployed;Failed;Timeout;UserActivity;HeadlessTaskFailed;StoppedByRequest;EverReady;ContentReady;BackupComplete;BackupFailure
// +kubebuilder:validation:Enum=Deployed;Failed;Timeout;FirstUserActivity;Closed;HeadlessTaskFailed;StoppedByRequest;EverReady;ContentReady;BackupComplete;BackupFailure
type WorkspaceCondition string

const (
Expand All @@ -144,8 +144,11 @@ const (
// Timeout contains the reason the workspace has timed out.
WorkspaceConditionTimeout WorkspaceCondition = "Timeout"

// UserActivity is the time when MarkActive was first called on the workspace
WorkspaceConditionUserActivity WorkspaceCondition = "UserActivity"
// FirstUserActivity is the time when MarkActive was first called on the workspace
WorkspaceConditionFirstUserActivity WorkspaceCondition = "FirstUserActivity"

// Closed indicates that a workspace is marked as closed. This will shorten its timeout.
WorkspaceConditionClosed WorkspaceCondition = "Closed"

// HeadlessTaskFailed indicates that a headless workspace task failed
WorkspaceConditionsHeadlessTaskFailed WorkspaceCondition = "HeadlessTaskFailed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

var workspace workspacev1.Workspace
if err := r.Get(ctx, req.NamespacedName, &workspace); err != nil {
log.Error(err, "unable to fetch workspace")
if !errors.IsNotFound(err) {
log.Error(err, "unable to fetch workspace")
}
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
Expand Down
8 changes: 5 additions & 3 deletions components/ws-manager-mk2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/gitpod-io/gitpod/common-go/pprof"
regapi "github.com/gitpod-io/gitpod/registry-facade/api"
"github.com/gitpod-io/gitpod/ws-manager-mk2/controllers"
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"
"github.com/gitpod-io/gitpod/ws-manager-mk2/service"
wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"
config "github.com/gitpod-io/gitpod/ws-manager/api/config"
Expand Down Expand Up @@ -103,7 +104,8 @@ func main() {
os.Exit(1)
}

wsmanService, err := setupGRPCService(cfg, mgr.GetClient())
activity := &activity.WorkspaceActivity{}
wsmanService, err := setupGRPCService(cfg, mgr.GetClient(), activity)
if err != nil {
setupLog.Error(err, "unable to start manager service")
os.Exit(1)
Expand Down Expand Up @@ -137,7 +139,7 @@ func main() {
}
}

func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client) (*service.WorkspaceManagerServer, error) {
func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client, activity *activity.WorkspaceActivity) (*service.WorkspaceManagerServer, error) {
// TODO(cw): remove use of common-go/log

if len(cfg.RPCServer.RateLimits) > 0 {
Expand Down Expand Up @@ -170,7 +172,7 @@ func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client) (*ser

grpcOpts = append(grpcOpts, grpc.UnknownServiceHandler(proxy.TransparentHandler(imagebuilderDirector(cfg.ImageBuilderProxy.TargetAddr))))

srv := service.NewWorkspaceManagerServer(k8s, &cfg.Manager, metrics.Registry)
srv := service.NewWorkspaceManagerServer(k8s, &cfg.Manager, metrics.Registry, activity)

grpcServer := grpc.NewServer(grpcOpts...)
grpc_prometheus.Register(grpcServer)
Expand Down
29 changes: 29 additions & 0 deletions components/ws-manager-mk2/pkg/activity/activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2023 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License.AGPL.txt in the project root for license information.

package activity

import (
"sync"
"time"
)

// WorkspaceActivity is used to track the last user activity per workspace. This is
// stored in memory instead of on the Workspace resource to limit load on the k8s API,
// as this value will update often for each workspace.
type WorkspaceActivity struct {
m sync.Map
}

func (w *WorkspaceActivity) Store(workspaceId string, lastActivity time.Time) {
w.m.Store(workspaceId, &lastActivity)
}

func (w *WorkspaceActivity) GetLastActivity(workspaceId string) *time.Time {
lastActivity, ok := w.m.Load(workspaceId)
if ok {
return lastActivity.(*time.Time)
}
return nil
}
140 changes: 126 additions & 14 deletions components/ws-manager-mk2/service/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"
"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/common-go/tracing"
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"
"github.com/gitpod-io/gitpod/ws-manager/api"
wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"
"github.com/gitpod-io/gitpod/ws-manager/api/config"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"

"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -41,24 +43,26 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func NewWorkspaceManagerServer(clnt client.Client, cfg *config.Configuration, reg prometheus.Registerer) *WorkspaceManagerServer {
func NewWorkspaceManagerServer(clnt client.Client, cfg *config.Configuration, reg prometheus.Registerer, activity *activity.WorkspaceActivity) *WorkspaceManagerServer {
metrics := newWorkspaceMetrics()
reg.MustRegister(metrics)

return &WorkspaceManagerServer{
Client: clnt,
Config: cfg,
metrics: metrics,
Client: clnt,
Config: cfg,
metrics: metrics,
activity: activity,
subs: subscriptions{
subscribers: make(map[string]chan *wsmanapi.SubscribeResponse),
},
}
}

type WorkspaceManagerServer struct {
Client client.Client
Config *config.Configuration
metrics *workspaceMetrics
Client client.Client
Config *config.Configuration
metrics *workspaceMetrics
activity *activity.WorkspaceActivity

subs subscriptions
wsmanapi.UnimplementedWorkspaceManagerServer
Expand Down Expand Up @@ -280,10 +284,15 @@ func (wsm *WorkspaceManagerServer) DescribeWorkspace(ctx context.Context, req *w
return nil, status.Errorf(codes.Internal, "cannot lookup workspace: %v", err)
}

return &wsmanapi.DescribeWorkspaceResponse{
result := &wsmanapi.DescribeWorkspaceResponse{
Status: extractWorkspaceStatus(&ws),
// TODO(cw): Add lastActivity
}, nil
}

lastActivity := wsm.activity.GetLastActivity(req.Id)
if lastActivity != nil {
result.LastActivity = lastActivity.UTC().Format(time.RFC3339Nano)
}
return result, nil
}

// Subscribe streams all status updates to a client
Expand All @@ -296,8 +305,91 @@ func (m *WorkspaceManagerServer) Subscribe(req *api.SubscribeRequest, srv api.Wo
return m.subs.Subscribe(srv.Context(), sub)
}

func (wsm *WorkspaceManagerServer) MarkActive(ctx context.Context, req *wsmanapi.MarkActiveRequest) (*wsmanapi.MarkActiveResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method MarkActive not implemented")
// MarkActive records a workspace as being active which prevents it from timing out
func (wsm *WorkspaceManagerServer) MarkActive(ctx context.Context, req *wsmanapi.MarkActiveRequest) (res *wsmanapi.MarkActiveResponse, err error) {
//nolint:ineffassign
span, ctx := tracing.FromContext(ctx, "MarkActive")
tracing.ApplyOWI(span, log.OWI("", "", req.Id))
defer tracing.FinishSpan(span, &err)

workspaceID := req.Id

var ws workspacev1.Workspace
err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: req.Id}, &ws)
if errors.IsNotFound(err) {
return nil, status.Errorf(codes.NotFound, "workspace %s does not exist", req.Id)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "cannot mark workspace: %v", err)
}

var firstUserActivity *timestamppb.Timestamp
for _, c := range ws.Status.Conditions {
if c.Type == string(workspacev1.WorkspaceConditionFirstUserActivity) {
firstUserActivity = timestamppb.New(c.LastTransitionTime.Time)
}
}

// if user already mark workspace as active and this request has IgnoreIfActive flag, just simple ignore it
if firstUserActivity != nil && req.IgnoreIfActive {
return &api.MarkActiveResponse{}, nil
}

// We do not keep the last activity in the workspace resource to limit the load we're placing
// on the K8S master in check. Thus, this state lives locally in a map.
now := time.Now().UTC()
wsm.activity.Store(req.Id, now)

// We do however maintain the the "closed" flag as annotation on the workspace. This flag should not change
// very often and provides a better UX if it persists across ws-manager restarts.
isMarkedClosed := conditionPresentAndTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionClosed))
if req.Closed && !isMarkedClosed {
err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {
ws.Status.Conditions = addUniqueCondition(ws.Status.Conditions, metav1.Condition{
Type: string(workspacev1.WorkspaceConditionClosed),
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now),
Reason: "MarkActiveRequest",
})
return nil
})
} else if !req.Closed && isMarkedClosed {
err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {
ws.Status.Conditions = addUniqueCondition(ws.Status.Conditions, metav1.Condition{
Type: string(workspacev1.WorkspaceConditionClosed),
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.NewTime(now),
Reason: "MarkActiveRequest",
})
return nil
})
}
if err != nil {
logFields := logrus.Fields{
"closed": req.Closed,
"isMarkedClosed": isMarkedClosed,
}
log.WithError(err).WithFields(log.OWI("", "", workspaceID)).WithFields(logFields).Warn("was unable to mark workspace properly")
}

// If it's the first call: Mark the pod with FirstUserActivity condition.
if firstUserActivity == nil {
err := wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {
ws.Status.Conditions = addUniqueCondition(ws.Status.Conditions, metav1.Condition{
Type: string(workspacev1.WorkspaceConditionFirstUserActivity),
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now),
Reason: "MarkActiveRequest",
})
return nil
})
if err != nil {
log.WithError(err).WithFields(log.OWI("", "", workspaceID)).Warn("was unable to set FirstUserActivity condition on workspace")
return nil, err
}
}

return &api.MarkActiveResponse{}, nil
}

func (wsm *WorkspaceManagerServer) SetTimeout(ctx context.Context, req *wsmanapi.SetTimeoutRequest) (*wsmanapi.SetTimeoutResponse, error) {
Expand Down Expand Up @@ -501,7 +593,7 @@ func extractWorkspaceStatus(ws *workspacev1.Workspace) *wsmanapi.WorkspaceStatus

var timeout string
if ws.Spec.Timeout.Time != nil {
timeout = ws.Spec.Timeout.Time.String()
timeout = ws.Spec.Timeout.Time.Duration.String()
}

var phase wsmanapi.WorkspacePhase
Expand All @@ -527,7 +619,7 @@ func extractWorkspaceStatus(ws *workspacev1.Workspace) *wsmanapi.WorkspaceStatus

var firstUserActivity *timestamppb.Timestamp
for _, c := range ws.Status.Conditions {
if c.Type == string(workspacev1.WorkspaceConditionUserActivity) {
if c.Type == string(workspacev1.WorkspaceConditionFirstUserActivity) {
firstUserActivity = timestamppb.New(c.LastTransitionTime.Time)
}
}
Expand Down Expand Up @@ -878,3 +970,23 @@ func (m *workspaceMetrics) Describe(ch chan<- *prometheus.Desc) {
func (m *workspaceMetrics) Collect(ch chan<- prometheus.Metric) {
m.totalStartsCounterVec.Collect(ch)
}

func addUniqueCondition(conds []metav1.Condition, cond metav1.Condition) []metav1.Condition {
for i, c := range conds {
if c.Type == cond.Type {
conds[i] = cond
return conds
}
}

return append(conds, cond)
}

func conditionPresentAndTrue(cond []metav1.Condition, tpe string) bool {
for _, c := range cond {
if c.Type == tpe {
return c.Status == metav1.ConditionTrue
}
}
return false
}
51 changes: 51 additions & 0 deletions components/ws-manager-mk2/ws-manager-mk2.code-workspace
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"folders": [
{ "path": "../common-go" },
{ "path": "../ws-manager" },
{ "path": "../ws-manager-api" },
{ "path": "../ws-manager-mk2" },
{ "path": "../server" },
{ "path": "../../test" },
{ "path": "../../dev/gpctl" },
{ "path": "../../install/installer" }
],
"settings": {
"typescript.tsdk": "gitpod/node_modules/typescript/lib",
"[json]": {
"editor.insertSpaces": true,
"editor.tabSize": 2
},
"[yaml]": {
"editor.insertSpaces": true,
"editor.tabSize": 2
},
"[go]": {
"editor.formatOnSave": true
},
"[tf]": {
"editor.insertSpaces": true,
"editor.tabSize": 2
},
"go.formatTool": "goimports",
"go.useLanguageServer": true,
"workspace.supportMultiRootWorkspace": true,
"database.connections": [
{
"type": "mysql",
"name": "devstaging DB",
"host": "127.0.0.1:23306",
"username": "gitpod",
"database": "gitpod",
"password": "test"
}
],
"launch": {},
"files.exclude": {
"**/.git": true
},
"go.lintTool": "golangci-lint",
"gopls": {
"allowModfileModifications": true
}
}
}