Skip to content

Commit

Permalink
Mark dataupload datadownload status failed when velero pod restart
Browse files Browse the repository at this point in the history
Signed-off-by: Ming Qiu <mqiu@vmware.com>
  • Loading branch information
qiuming-best committed Jul 6, 2023
1 parent e54a8af commit 704d292
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 32 deletions.
64 changes: 62 additions & 2 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,15 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}

if err = controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger).SetupWithManager(s.mgr); err != nil {
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger)
s.markDataUploadsCancel(s.mgr.GetClient(), dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger).SetupWithManager(s.mgr); err != nil {
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger)
s.markDataDownloadsCancel(s.mgr.GetClient(), dataDownloadReconciler)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}

Expand Down Expand Up @@ -333,6 +337,62 @@ func (s *nodeAgentServer) markInProgressCRsFailed() {
s.markInProgressPVRsFailed(client)
}

func (s *nodeAgentServer) markDataUploadsCancel(client ctrlclient.Client, r *controller.DataUploadReconciler) {
pods := &v1.PodList{}
if err := client.List(s.ctx, pods); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node")
return
}
for _, pod := range pods.Items {
du, err := r.FindDataUploadByPod(&pod)
if err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to get dataupload by pod %s/%s", pod.Namespace, pod.Name)
continue
}
updated := du.DeepCopy()
updated.Spec.Cancel = true
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled
if du.Status.StartTimestamp.IsZero() {
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the server starting, mark it as cancel", du.Status.Phase)
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(du)); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
}
}

func (s *nodeAgentServer) markDataDownloadsCancel(client ctrlclient.Client, r *controller.DataDownloadReconciler) {
pods := &v1.PodList{}
if err := client.List(s.ctx, pods); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node")
return
}
for _, pod := range pods.Items {
dd, err := r.FindDataDownloadByPod(&pod)
if err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to get datadownload by pod %s/%s", pod.Namespace, pod.Name)
continue
}
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the server starting, mark it as cancel", dd.Status.Phase)
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dd.Status.StartTimestamp.IsZero() {
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(dd)); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName())
continue
}
s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
}
}

func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
pvbs := &velerov1api.PodVolumeBackupList{}
if err := client.List(s.ctx, pvbs, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil {
Expand Down
42 changes: 42 additions & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,7 @@ func markInProgressBackupsFailed(ctx context.Context, client ctrlclient.Client,
continue
}
log.WithField("backup", backup.GetName()).Warn(updated.Status.FailureReason)
markDataUploadsCancel(ctx, client, &backup, log)
}
}

Expand All @@ -1101,5 +1102,46 @@ func markInProgressRestoresFailed(ctx context.Context, client ctrlclient.Client,
continue
}
log.WithField("restore", restore.GetName()).Warn(updated.Status.FailureReason)
markDataDownloadsCancel(ctx, client, &restore, log)
}
}

func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup *velerov1api.Backup, log logrus.FieldLogger) {
dataUploads := &velerov2alpha1api.DataUploadList{}

if err := client.List(ctx, dataUploads, &ctrlclient.MatchingLabels{velerov1api.BackupUIDLabel: string(backup.GetUID())}); err != nil {
log.WithError(errors.WithStack(err)).Error("failed to list dataUploads")
return
}

for _, du := range dataUploads.Items {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the server starting, mark it as cancel", du.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
log.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
}
}

func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, restore *velerov1api.Restore, log logrus.FieldLogger) {
dataDownloads := &velerov2alpha1api.DataDownloadList{}

if err := client.List(ctx, dataDownloads, &ctrlclient.MatchingLabels{velerov1api.RestoreUIDLabel: string(restore.GetUID())}); err != nil {
log.WithError(errors.WithStack(err)).Error("failed to list dataDownloads")
return
}

for _, dd := range dataDownloads.Items {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the server starting, mark it as cancel", dd.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&dd)); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", dd.GetName())
continue
}
log.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
}
}
48 changes: 34 additions & 14 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -57,7 +58,7 @@ type DataDownloadReconciler struct {
logger logrus.FieldLogger
credentialGetter *credentials.CredentialGetter
fileSystem filesystem.Interface
clock clock.WithTickerAndDelayedExecution
Clock clock.WithTickerAndDelayedExecution
restoreExposer exposer.GenericRestoreExposer
nodeName string
repositoryEnsurer *repository.Ensurer
Expand All @@ -72,7 +73,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter
logger: logger.WithField("controller", "DataDownload"),
credentialGetter: credentialGetter,
fileSystem: filesystem.NewFileSystem(),
clock: &clock.RealClock{},
Clock: &clock.RealClock{},
nodeName: nodeName,
repositoryEnsurer: repoEnsurer,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
Expand Down Expand Up @@ -132,9 +133,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request

log.Info("Data download is accepted")

if dd.Spec.Cancel {
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
return ctrl.Result{}, nil
}

hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name}

// ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration)
Expand All @@ -146,6 +152,12 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
log.Info("Data download is prepared")

if dd.Spec.Cancel {
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
return ctrl.Result{}, nil
}

fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)

if fsRestore != nil {
Expand Down Expand Up @@ -184,7 +196,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
Expand Down Expand Up @@ -271,7 +283,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na

original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
} else {
Expand Down Expand Up @@ -313,9 +325,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dd.Status.StartTimestamp.IsZero() {
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
}
Expand Down Expand Up @@ -382,12 +394,7 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request {
pod := podObj.(*v1.Pod)

dd := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Labels[velerov1api.DataDownloadLabel],
}, dd)

dd, err := r.FindDataDownloadByPod(pod)
if err != nil {
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload")
return []reconcile.Request{}
Expand Down Expand Up @@ -416,6 +423,19 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object)
return requests
}

func (r *DataDownloadReconciler) FindDataDownloadByPod(pod *corev1.Pod) (*velerov2alpha1api.DataDownload, error) {
dd := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Labels[velerov1api.DataDownloadLabel],
}, dd)

if err != nil {
return nil, errors.Wrapf(err, "error to find DataDownload by pod %s/%s", pod.Namespace, pod.Name)
}
return dd, nil
}

func (r *DataDownloadReconciler) patchDataDownload(ctx context.Context, req *velerov2alpha1api.DataDownload, mutate func(*velerov2alpha1api.DataDownload)) error {
original := req.DeepCopy()
mutate(req)
Expand All @@ -442,7 +462,7 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = errors.WithMessage(err, msg).Error()
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}

if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataDownload status")
Expand Down
50 changes: 34 additions & 16 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type DataUploadReconciler struct {
kubeClient kubernetes.Interface
csiSnapshotClient snapshotter.SnapshotV1Interface
repoEnsurer *repository.Ensurer
clock clocks.WithTickerAndDelayedExecution
Clock clocks.WithTickerAndDelayedExecution
credentialGetter *credentials.CredentialGetter
nodeName string
fileSystem filesystem.Interface
Expand All @@ -77,7 +77,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa
client: client,
kubeClient: kubeClient,
csiSnapshotClient: csiSnapshotClient,
clock: clock,
Clock: clock,
credentialGetter: cred,
nodeName: nodeName,
fileSystem: fs,
Expand Down Expand Up @@ -134,18 +134,29 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)

log.Info("Data upload is accepted")

if du.Spec.Cancel {
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
return ctrl.Result{}, nil
}

exposeParam := r.setupExposeParam(&du)

if err := ep.Expose(ctx, getOwnerObject(&du), exposeParam); err != nil {
return r.errorOut(ctx, &du, err, "error to expose snapshot", log)
}
log.Info("Snapshot is exposed")
// ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
return ctrl.Result{}, nil
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared {
log.Info("Data upload is prepared")

if du.Spec.Cancel {
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
return ctrl.Result{}, nil
}

fsBackup := r.dataPathMgr.GetAsyncBR(du.Name)
if fsBackup != nil {
log.Info("Cancellable data path is already started")
Expand Down Expand Up @@ -183,7 +194,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Update status to InProgress
original := du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
return r.errorOut(ctx, &du, err, "error updating dataupload status", log)
}
Expand Down Expand Up @@ -277,7 +288,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp
du.Status.Path = result.Backup.Source.ByPath
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted
du.Status.SnapshotID = result.Backup.SnapshotID
du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if result.Backup.EmptySnapshot {
du.Status.Message = "volume was empty so no data was upload"
}
Expand Down Expand Up @@ -331,9 +342,9 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp
original := du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled
if du.Status.StartTimestamp.IsZero() {
du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating DataUpload status")
}
Expand Down Expand Up @@ -399,13 +410,7 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {

func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reconcile.Request {
pod := podObj.(*corev1.Pod)

du := &velerov2alpha1api.DataUpload{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Labels[velerov1api.DataUploadLabel],
}, du)

du, err := r.FindDataUploadByPod(pod)
if err != nil {
r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("unable to get dataupload")
return []reconcile.Request{}
Expand All @@ -430,6 +435,19 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco
return []reconcile.Request{requests}
}

func (r *DataUploadReconciler) FindDataUploadByPod(pod *corev1.Pod) (*velerov2alpha1api.DataUpload, error) {
du := &velerov2alpha1api.DataUpload{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Labels[velerov1api.DataUploadLabel],
}, du)

if err != nil {
return nil, errors.Wrapf(err, "error to find DataUpload by pod %s/%s", pod.Namespace, pod.Name)
}
return du, nil
}

func (r *DataUploadReconciler) patchDataUpload(ctx context.Context, req *velerov2alpha1api.DataUpload, mutate func(*velerov2alpha1api.DataUpload)) error {
original := req.DeepCopy()
mutate(req)
Expand Down Expand Up @@ -462,10 +480,10 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed
du.Status.Message = errors.WithMessage(err, msg).Error()
if du.Status.StartTimestamp.IsZero() {
du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}

du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataUpload status")
}
Expand Down

0 comments on commit 704d292

Please sign in to comment.