Skip to content

Commit

Permalink
[ws-manager] improve finalizeWorkspaceContent logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sagor999 authored and roboquat committed Aug 30, 2022
1 parent bf9157a commit 92f2f1a
Showing 1 changed file with 44 additions and 57 deletions.
101 changes: 44 additions & 57 deletions components/ws-manager/pkg/manager/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,26 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
return
}

ctx, cancelReq := context.WithTimeout(ctx, time.Duration(m.manager.Config.Timeouts.ContentFinalization))
// Make sure only one finalizeWorkspaceContent() can be active at the same time
// finalizeWorkspaceContent() may be called multiple times, sometimes within several milliseconds.
// this ensures that we will not attempt to do any disposing from multiple threads
_, alreadyFinalizing := m.finalizerMap.LoadOrStore(workspaceID, cancelReq)
if alreadyFinalizing {
span.LogKV("alreadyFinalizing", true)
return
}
defer func() {
// we're done disposing - remove from the finalizerMap
val, ok := m.finalizerMap.LoadAndDelete(workspaceID)
if !ok {
return
}

cancelReq := val.(context.CancelFunc)
cancelReq()
}()

disposalStatus := &workspaceDisposalStatus{}
defer func() {
if disposalStatus.Status == DisposalEmpty {
Expand Down Expand Up @@ -1058,13 +1078,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
doBackup := wso.WasEverReady() && !wso.IsWorkspaceHeadless()
doBackupLogs := tpe == api.WorkspaceType_PREBUILD
doSnapshot := tpe == api.WorkspaceType_PREBUILD
doFinalize := func() (worked bool, gitStatus *csapi.GitStatus, err error) {
_, alreadyFinalizing := m.finalizerMap.Load(workspaceID)
if alreadyFinalizing {
span.LogKV("alreadyFinalizing", true)
return false, nil, nil
}

doFinalize := func() (gitStatus *csapi.GitStatus, err error) {
// Maybe the workspace never made it to a phase where we actually initialized a workspace.
// Assuming that once we've had a nodeName we've spoken to ws-daemon it's safe to assume that if
// we don't have a nodeName we don't need to dipose the workspace.
Expand All @@ -1073,50 +1087,25 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
if !doBackup && !doSnapshot && wso.NodeName() == "" {
// we don't need a backup and have never spoken to ws-daemon: we're good here.
span.LogKV("noBackupNeededAndNoNode", true)
return true, &csapi.GitStatus{}, nil
return &csapi.GitStatus{}, nil
}

// we're not yet finalizing - start the process
snc, err := m.manager.connectToWorkspaceDaemon(ctx, *wso)
if err != nil {
tracing.LogError(span, err)
return true, nil, status.Errorf(codes.Unavailable, "cannot connect to workspace daemon: %q", err)
return nil, status.Errorf(codes.Unavailable, "cannot connect to workspace daemon: %q", err)
}

defer func() {
// we're done disposing - remove from the finalizerMap
val, ok := m.finalizerMap.LoadAndDelete(workspaceID)
if !ok {
return
}

cancelReq := val.(context.CancelFunc)
cancelReq()
}()

// only build prebuild snapshots of initialized/ready workspaces.
if doSnapshot {
_, err = snc.WaitForInit(ctx, &wsdaemon.WaitForInitRequest{Id: workspaceID})
if st, ok := grpc_status.FromError(err); ok {
if st.Code() == codes.FailedPrecondition &&
(st.Message() == "workspace is not initializing or ready" || st.Message() == "workspace is not ready") {
log.Warn("skipping snapshot creation because content-initializer never finished or the workspace reached a ready state")
doSnapshot = false
} else if st.Code() == codes.NotFound {
// the workspace has gone some reason
// e.g. since it was a retry, it already succeeded the first time.
log.WithError(err).Warnf("skipping snapshot and disposing because the workspace has already gone")
return false, &csapi.GitStatus{}, nil
}
}
if err != nil {
log.WithError(err).Warn("WaitForInit returned an error")
}
// make sure that workspace was ready, otherwise there is no need to backup anything
// as we might backup corrupted workspace state
// this also ensures that if INITIALIZING still going, that we will wait for it to finish before disposing the workspace
_, err = snc.WaitForInit(ctx, &wsdaemon.WaitForInitRequest{Id: workspaceID})
if err != nil {
tracing.LogError(span, err)
return nil, err
}

ctx, cancelReq := context.WithTimeout(ctx, time.Duration(m.manager.Config.Timeouts.ContentFinalization))
m.finalizerMap.Store(workspaceID, cancelReq)

if pvcFeatureEnabled {
// pvc was created with the name of the pod. see createDefiniteWorkspacePod()
pvcName := wso.Pod.Name
Expand All @@ -1138,7 +1127,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
err = m.manager.Clientset.Create(ctx, volumeSnapshot)
if err != nil && !k8serr.IsAlreadyExists(err) {
err = xerrors.Errorf("cannot create volumesnapshot: %v", err)
return true, nil, err
return nil, err
}
createdVolumeSnapshot = true
volumeSnapshotTime = time.Now()
Expand Down Expand Up @@ -1168,7 +1157,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb

err = xerrors.Errorf("%s timed out while waiting for volume snapshot to get ready", m.manager.Config.Timeouts.ContentFinalization.String())
log.Error(err.Error())
return true, nil, err
return nil, err
}

hist, err := m.manager.metrics.volumeSnapshotTimeHistVec.GetMetricWithLabelValues(wsType, wso.Pod.Labels[workspaceClassLabel])
Expand All @@ -1184,20 +1173,20 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
err := m.manager.Clientset.Get(ctx, types.NamespacedName{Namespace: "", Name: pvcVolumeSnapshotContentName}, &volumeSnapshotContent)
if err != nil {
log.WithError(err).Error("was unable to get volume snapshot content")
return true, nil, err
return nil, err
}

if volumeSnapshotContent.Status == nil {
return true, nil, xerrors.Errorf("volume snapshot content status is nil")
return nil, xerrors.Errorf("volume snapshot content status is nil")
}
if volumeSnapshotContent.Status.SnapshotHandle == nil {
return true, nil, xerrors.Errorf("volume snapshot content's snapshot handle is nil")
return nil, xerrors.Errorf("volume snapshot content's snapshot handle is nil")
}
snapshotHandle := *volumeSnapshotContent.Status.SnapshotHandle

b, err := json.Marshal(workspaceVolumeSnapshotStatus{VolumeSnapshotName: pvcVolumeSnapshotName, VolumeSnapshotHandle: snapshotHandle})
if err != nil {
return true, nil, err
return nil, err
}

err = m.manager.markWorkspace(context.Background(), workspaceID, addMark(pvcWorkspaceVolumeSnapshotAnnotation, string(b)))
Expand All @@ -1207,7 +1196,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
if errMark != nil {
log.WithError(errMark).Warn("was unable to mark workspace as failed")
}
return true, nil, err
return nil, err
}

if doSnapshot {
Expand All @@ -1219,7 +1208,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
if errMark != nil {
log.WithError(errMark).Warn("was unable to mark workspace as failed")
}
return true, nil, err
return nil, err
}
}

Expand All @@ -1232,7 +1221,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
err = m.manager.deleteWorkspacePVC(ctx, pvcName)
if err != nil {
log.Error(err)
return true, nil, err
return nil, err
}
deletedPVC = true
}
Expand Down Expand Up @@ -1277,7 +1266,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
if err != nil {
log.WithError(err).Error("DisposeWorkspace failed")
}
return true, gitStatus, err
return gitStatus, err
}

var (
Expand All @@ -1297,16 +1286,11 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
}
for i := 0; i < wsdaemonMaxAttempts; i++ {
span.LogKV("attempt", i)
didSometing, gs, err := doFinalize()
gs, err := doFinalize()
if err != nil {
tracing.LogError(span, err)
log.WithError(err).Error("doFinalize failed")
}
if !didSometing {
// someone else is managing finalization process ... we don't have to bother
span.LogKV("did-nothing", true)
return
}

// by default we assume the worst case scenario. If things aren't just as bad, we'll tune it down below.
dataloss = true
Expand Down Expand Up @@ -1348,6 +1332,9 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
case codes.FailedPrecondition:
// the workspace content was not in the state we thought it was
dataloss = true
case codes.NotFound:
// something else might have deleted workspace state already and we were not able to do a backup
dataloss = true
}
}
break
Expand Down

0 comments on commit 92f2f1a

Please sign in to comment.