Skip to content

Commit

Permalink
Watch VolumeSnapshot object is ready
Browse files Browse the repository at this point in the history
When the VolumeSnapshot object is ready, notify to the Pod that it
belongs to.

Signed-off-by: JenTing Hsiao <hsiaoairplane@gmail.com>
  • Loading branch information
jenting committed Jun 22, 2022
1 parent 01c7c26 commit 038827c
Showing 1 changed file with 75 additions and 43 deletions.
118 changes: 75 additions & 43 deletions components/ws-manager/pkg/manager/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -83,6 +82,8 @@ type Monitor struct {
act actingManager

OnError func(error)

notifyPod map[string]chan string
}

// CreateMonitor creates a new monitor
Expand All @@ -103,6 +104,8 @@ func (m *Manager) CreateMonitor() (*Monitor, error) {
OnError: func(err error) {
log.WithError(err).Error("workspace monitor error")
},

notifyPod: make(map[string]chan string),
}
res.eventpool = workpool.NewEventWorkerPool(res.handleEvent)
res.act = struct {
Expand Down Expand Up @@ -145,13 +148,47 @@ func (m *Monitor) handleEvent(evt watch.Event) {
switch evt.Object.(type) {
case *corev1.Pod:
err = m.onPodEvent(evt)
case *volumesnapshotv1.VolumeSnapshot:
err = m.onVolumesnapshotEvent(evt)
}

if err != nil {
m.OnError(err)
}
}

func (m *Monitor) onVolumesnapshotEvent(evt watch.Event) error {
vs, ok := evt.Object.(*volumesnapshotv1.VolumeSnapshot)
if !ok {
return xerrors.Errorf("received non-volume-snapshot event")
}

log := log.WithField("volumesnapshot", vs.Name)

if vs.Spec.Source.PersistentVolumeClaimName == nil {
// there is no pvc name within the VolumeSnapshot object
log.Warn("the spec.source.persistentVolumeClaimName is empty")
return nil
}

// the pod name is 1:1 mapping to pvc name
podName := *vs.Spec.Source.PersistentVolumeClaimName
log = log.WithField("pod", podName)

if vs.Status == nil || vs.Status.ReadyToUse == nil || !*vs.Status.ReadyToUse || vs.Status.BoundVolumeSnapshotContentName == nil {
return nil
}

vsc := *vs.Status.BoundVolumeSnapshotContentName
log.Infof("the vsc %s is ready", vsc)
if m.notifyPod[podName] == nil {
m.notifyPod[podName] = make(chan string)
}
m.notifyPod[podName] <- vsc

return nil
}

// onPodEvent interpretes Kubernetes events, translates and broadcasts them, and acts based on them
func (m *Monitor) onPodEvent(evt watch.Event) error {
// Beware: we patch running pods to add annotations. At the moment this is not a problem as do not attach
Expand Down Expand Up @@ -987,52 +1024,47 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
volumeSnapshotTime = time.Now()
}
if createdVolumeSnapshot {
backoff := wait.Backoff{
Steps: 30,
Duration: 100 * time.Millisecond,
Factor: 1.5,
Jitter: 0.1,
Cap: 10 * time.Minute,
waitTimeout := 10 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), waitTimeout)
defer cancel()

if m.notifyPod[wso.Pod.Name] == nil {
m.notifyPod[wso.Pod.Name] = make(chan string)
}
log = log.WithField("VolumeSnapshot.Name", pvcVolumeSnapshotName)
err = wait.ExponentialBackoff(backoff, func() (bool, error) {
var volumeSnapshot volumesnapshotv1.VolumeSnapshot
err := m.manager.Clientset.Get(ctx, types.NamespacedName{Namespace: m.manager.Config.Namespace, Name: pvcVolumeSnapshotName}, &volumeSnapshot)
if err != nil {
if k8serr.IsNotFound(err) {
// volumesnapshot doesn't exist yet, retry again
return false, nil

for {
select {
case pvcVolumeSnapshotContentName = <-m.notifyPod[wso.Pod.Name]:
readyVolumeSnapshot = true
break
case <-ctx.Done():
// there might be a chance that the VolumeSnapshot is ready but somehow
// we did not receive the notification.
// let's give it the last chance to check the VolumeSnapshot is ready
var vs volumesnapshotv1.VolumeSnapshot
err := m.manager.Clientset.Get(ctx, types.NamespacedName{Namespace: m.manager.Config.Namespace, Name: pvcVolumeSnapshotName}, &vs)
if err == nil && vs.Status != nil && vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.BoundVolumeSnapshotContentName != nil {
pvcVolumeSnapshotContentName = *vs.Status.BoundVolumeSnapshotContentName
readyVolumeSnapshot = true
break
}
log.WithError(err).Error("was unable to get volume snapshot")
return false, err

err = xerrors.Errorf("%s timed out while waiting for volume snapshot to get ready", waitTimeout.String())
log.Error(err.Error())
return true, nil, err
}
if volumeSnapshot.Status != nil {
if volumeSnapshot.Status.ReadyToUse != nil && *(volumeSnapshot.Status.ReadyToUse) && volumeSnapshot.Status.BoundVolumeSnapshotContentName != nil {
pvcVolumeSnapshotContentName = *volumeSnapshot.Status.BoundVolumeSnapshotContentName
return true, nil
}
if volumeSnapshot.Status.Error != nil {
if volumeSnapshot.Status.Error.Message != nil {
err = xerrors.Errorf("error during volume snapshot creation: %s", *volumeSnapshot.Status.Error.Message)
log.WithError(err).Error("unable to create volume snapshot")
return false, err
}
log.Error("unknown error during volume snapshot creation")
return false, xerrors.Errorf("unknown error during volume snapshot creation")
}

if !readyVolumeSnapshot {
continue
}
return false, nil
})
if err != nil {
log.WithError(err).Errorf("failed while waiting for volume snapshot to get ready")
return true, nil, err
}
readyVolumeSnapshot = true
hist, err := m.manager.metrics.volumeSnapshotTimeHistVec.GetMetricWithLabelValues(wsType, wso.Pod.Labels[workspaceClassLabel])
if err != nil {
log.WithError(err).WithField("type", wsType).Warn("cannot get volume snapshot time histogram metric")
} else {
hist.Observe(time.Since(volumeSnapshotTime).Seconds())

hist, err := m.manager.metrics.volumeSnapshotTimeHistVec.GetMetricWithLabelValues(wsType, wso.Pod.Labels[workspaceClassLabel])
if err != nil {
log.WithError(err).WithField("type", wsType).Warn("cannot get volume snapshot time histogram metric")
} else {
hist.Observe(time.Since(volumeSnapshotTime).Seconds())
}
break
}
}
if readyVolumeSnapshot && !markVolumeSnapshotAnnotation {
Expand Down

0 comments on commit 038827c

Please sign in to comment.