Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ws-manager] improve finalizeWorkspaceContent logic #12450

Merged
merged 1 commit into from
Aug 30, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 44 additions & 57 deletions components/ws-manager/pkg/manager/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,26 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
return
}

ctx, cancelReq := context.WithTimeout(ctx, time.Duration(m.manager.Config.Timeouts.ContentFinalization))
// Make sure only one finalizeWorkspaceContent() can be active at the same time
// finalizeWorkspaceContent() may be called multiple times, sometimes within several milliseconds.
// this ensures that we will not attempt to do any disposing from multiple threads
_, alreadyFinalizing := m.finalizerMap.LoadOrStore(workspaceID, cancelReq)
if alreadyFinalizing {
span.LogKV("alreadyFinalizing", true)
return
}
defer func() {
// we're done disposing - remove from the finalizerMap
val, ok := m.finalizerMap.LoadAndDelete(workspaceID)
if !ok {
return
}

cancelReq := val.(context.CancelFunc)
cancelReq()
}()

disposalStatus := &workspaceDisposalStatus{}
defer func() {
if disposalStatus.Status == DisposalEmpty {
Expand Down Expand Up @@ -1058,13 +1078,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
doBackup := wso.WasEverReady() && !wso.IsWorkspaceHeadless()
doBackupLogs := tpe == api.WorkspaceType_PREBUILD
doSnapshot := tpe == api.WorkspaceType_PREBUILD
doFinalize := func() (worked bool, gitStatus *csapi.GitStatus, err error) {
_, alreadyFinalizing := m.finalizerMap.Load(workspaceID)
if alreadyFinalizing {
span.LogKV("alreadyFinalizing", true)
return false, nil, nil
}

doFinalize := func() (gitStatus *csapi.GitStatus, err error) {
// Maybe the workspace never made it to a phase where we actually initialized a workspace.
// Assuming that once we've had a nodeName we've spoken to ws-daemon it's safe to assume that if
// we don't have a nodeName we don't need to dipose the workspace.
Expand All @@ -1073,50 +1087,25 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
if !doBackup && !doSnapshot && wso.NodeName() == "" {
// we don't need a backup and have never spoken to ws-daemon: we're good here.
span.LogKV("noBackupNeededAndNoNode", true)
return true, &csapi.GitStatus{}, nil
return &csapi.GitStatus{}, nil
}

// we're not yet finalizing - start the process
snc, err := m.manager.connectToWorkspaceDaemon(ctx, *wso)
if err != nil {
tracing.LogError(span, err)
return true, nil, status.Errorf(codes.Unavailable, "cannot connect to workspace daemon: %q", err)
return nil, status.Errorf(codes.Unavailable, "cannot connect to workspace daemon: %q", err)
}

defer func() {
// we're done disposing - remove from the finalizerMap
val, ok := m.finalizerMap.LoadAndDelete(workspaceID)
if !ok {
return
}

cancelReq := val.(context.CancelFunc)
cancelReq()
}()

// only build prebuild snapshots of initialized/ready workspaces.
if doSnapshot {
_, err = snc.WaitForInit(ctx, &wsdaemon.WaitForInitRequest{Id: workspaceID})
if st, ok := grpc_status.FromError(err); ok {
if st.Code() == codes.FailedPrecondition &&
(st.Message() == "workspace is not initializing or ready" || st.Message() == "workspace is not ready") {
log.Warn("skipping snapshot creation because content-initializer never finished or the workspace reached a ready state")
doSnapshot = false
} else if st.Code() == codes.NotFound {
// the workspace has gone some reason
// e.g. since it was a retry, it already succeeded the first time.
log.WithError(err).Warnf("skipping snapshot and disposing because the workspace has already gone")
return false, &csapi.GitStatus{}, nil
}
}
if err != nil {
log.WithError(err).Warn("WaitForInit returned an error")
}
// make sure that workspace was ready, otherwise there is no need to backup anything
// as we might backup corrupted workspace state
// this also ensures that if INITIALIZING still going, that we will wait for it to finish before disposing the workspace
_, err = snc.WaitForInit(ctx, &wsdaemon.WaitForInitRequest{Id: workspaceID})
if err != nil {
tracing.LogError(span, err)
return nil, err
}

ctx, cancelReq := context.WithTimeout(ctx, time.Duration(m.manager.Config.Timeouts.ContentFinalization))
m.finalizerMap.Store(workspaceID, cancelReq)

if pvcFeatureEnabled {
// pvc was created with the name of the pod. see createDefiniteWorkspacePod()
pvcName := wso.Pod.Name
Expand All @@ -1138,7 +1127,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
err = m.manager.Clientset.Create(ctx, volumeSnapshot)
if err != nil && !k8serr.IsAlreadyExists(err) {
err = xerrors.Errorf("cannot create volumesnapshot: %v", err)
return true, nil, err
return nil, err
}
createdVolumeSnapshot = true
volumeSnapshotTime = time.Now()
Expand Down Expand Up @@ -1168,7 +1157,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb

err = xerrors.Errorf("%s timed out while waiting for volume snapshot to get ready", m.manager.Config.Timeouts.ContentFinalization.String())
log.Error(err.Error())
return true, nil, err
return nil, err
}

hist, err := m.manager.metrics.volumeSnapshotTimeHistVec.GetMetricWithLabelValues(wsType, wso.Pod.Labels[workspaceClassLabel])
Expand All @@ -1184,20 +1173,20 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
err := m.manager.Clientset.Get(ctx, types.NamespacedName{Namespace: "", Name: pvcVolumeSnapshotContentName}, &volumeSnapshotContent)
if err != nil {
log.WithError(err).Error("was unable to get volume snapshot content")
return true, nil, err
return nil, err
}

if volumeSnapshotContent.Status == nil {
return true, nil, xerrors.Errorf("volume snapshot content status is nil")
return nil, xerrors.Errorf("volume snapshot content status is nil")
}
if volumeSnapshotContent.Status.SnapshotHandle == nil {
return true, nil, xerrors.Errorf("volume snapshot content's snapshot handle is nil")
return nil, xerrors.Errorf("volume snapshot content's snapshot handle is nil")
}
snapshotHandle := *volumeSnapshotContent.Status.SnapshotHandle

b, err := json.Marshal(workspaceVolumeSnapshotStatus{VolumeSnapshotName: pvcVolumeSnapshotName, VolumeSnapshotHandle: snapshotHandle})
if err != nil {
return true, nil, err
return nil, err
}

err = m.manager.markWorkspace(context.Background(), workspaceID, addMark(pvcWorkspaceVolumeSnapshotAnnotation, string(b)))
Expand All @@ -1207,7 +1196,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
if errMark != nil {
log.WithError(errMark).Warn("was unable to mark workspace as failed")
}
return true, nil, err
return nil, err
}

if doSnapshot {
Expand All @@ -1219,7 +1208,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
if errMark != nil {
log.WithError(errMark).Warn("was unable to mark workspace as failed")
}
return true, nil, err
return nil, err
}
}

Expand All @@ -1232,7 +1221,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
err = m.manager.deleteWorkspacePVC(ctx, pvcName)
if err != nil {
log.Error(err)
return true, nil, err
return nil, err
}
deletedPVC = true
}
Expand Down Expand Up @@ -1277,7 +1266,7 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
if err != nil {
log.WithError(err).Error("DisposeWorkspace failed")
}
return true, gitStatus, err
return gitStatus, err
}

var (
Expand All @@ -1297,16 +1286,11 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
}
for i := 0; i < wsdaemonMaxAttempts; i++ {
span.LogKV("attempt", i)
didSometing, gs, err := doFinalize()
gs, err := doFinalize()
if err != nil {
tracing.LogError(span, err)
log.WithError(err).Error("doFinalize failed")
}
if !didSometing {
// someone else is managing finalization process ... we don't have to bother
span.LogKV("did-nothing", true)
return
}

// by default we assume the worst case scenario. If things aren't just as bad, we'll tune it down below.
dataloss = true
Expand Down Expand Up @@ -1348,6 +1332,9 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
case codes.FailedPrecondition:
// the workspace content was not in the state we thought it was
dataloss = true
case codes.NotFound:
// something else might have deleted workspace state already and we were not able to do a backup
dataloss = true
}
}
break
Expand Down