Skip to content

Commit

Permalink
Mark dataupload datadownload status failed when velero pod restart
Browse files Browse the repository at this point in the history
Signed-off-by: Ming Qiu <mqiu@vmware.com>
  • Loading branch information
qiuming-best committed Jul 7, 2023
1 parent e54a8af commit 138ae93
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 36 deletions.
52 changes: 50 additions & 2 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,15 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}

if err = controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger).SetupWithManager(s.mgr); err != nil {
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger)
s.markDataUploadsCancel(s.mgr.GetClient(), dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger).SetupWithManager(s.mgr); err != nil {
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger)
s.markDataDownloadsCancel(s.mgr.GetClient(), dataDownloadReconciler)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}

Expand Down Expand Up @@ -333,6 +337,50 @@ func (s *nodeAgentServer) markInProgressCRsFailed() {
s.markInProgressPVRsFailed(client)
}

func (s *nodeAgentServer) markDataUploadsCancel(client ctrlclient.Client, r *controller.DataUploadReconciler) {
if dataUploads, err := r.FindDataUploads(s.ctx, s.mgr.GetClient(), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads")
} else {
for _, du := range dataUploads {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled
if du.Status.StartTimestamp.IsZero() {
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(du)); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", du.GetName())
continue
}
s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
}
}
}

func (s *nodeAgentServer) markDataDownloadsCancel(client ctrlclient.Client, r *controller.DataDownloadReconciler) {
if dataDownloads, err := r.FindDataDownloads(s.ctx, s.mgr.GetClient(), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads")
} else {
for _, dd := range dataDownloads {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dd.Status.StartTimestamp.IsZero() {
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(dd)); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName())
continue
}
s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
}
}
}

func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
pvbs := &velerov1api.PodVolumeBackupList{}
if err := client.List(s.ctx, pvbs, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil {
Expand Down
44 changes: 44 additions & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,7 @@ func markInProgressBackupsFailed(ctx context.Context, client ctrlclient.Client,
continue
}
log.WithField("backup", backup.GetName()).Warn(updated.Status.FailureReason)
markDataUploadsCancel(ctx, client, backup, log)
}
}

Expand All @@ -1101,5 +1102,48 @@ func markInProgressRestoresFailed(ctx context.Context, client ctrlclient.Client,
continue
}
log.WithField("restore", restore.GetName()).Warn(updated.Status.FailureReason)
markDataDownloadsCancel(ctx, client, restore, log)
}
}

func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup velerov1api.Backup, log logrus.FieldLogger) {
dataUploads := &velerov2alpha1api.DataUploadList{}

if err := client.List(ctx, dataUploads, &ctrlclient.MatchingLabels{velerov1api.BackupUIDLabel: string(backup.GetUID())}); err != nil {
log.WithError(errors.WithStack(err)).Error("failed to list dataUploads")
return
}

for i := range dataUploads.Items {
du := dataUploads.Items[i]
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
log.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
}
}

func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, restore velerov1api.Restore, log logrus.FieldLogger) {
dataDownloads := &velerov2alpha1api.DataDownloadList{}

if err := client.List(ctx, dataDownloads, &ctrlclient.MatchingLabels{velerov1api.RestoreUIDLabel: string(restore.GetUID())}); err != nil {
log.WithError(errors.WithStack(err)).Error("failed to list dataDownloads")
return
}

for i := range dataDownloads.Items {
dd := dataDownloads.Items[i]
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&dd)); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", dd.GetName())
continue
}
log.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
}
}
74 changes: 60 additions & 14 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -57,7 +58,7 @@ type DataDownloadReconciler struct {
logger logrus.FieldLogger
credentialGetter *credentials.CredentialGetter
fileSystem filesystem.Interface
clock clock.WithTickerAndDelayedExecution
Clock clock.WithTickerAndDelayedExecution
restoreExposer exposer.GenericRestoreExposer
nodeName string
repositoryEnsurer *repository.Ensurer
Expand All @@ -72,7 +73,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter
logger: logger.WithField("controller", "DataDownload"),
credentialGetter: credentialGetter,
fileSystem: filesystem.NewFileSystem(),
clock: &clock.RealClock{},
Clock: &clock.RealClock{},
nodeName: nodeName,
repositoryEnsurer: repoEnsurer,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
Expand Down Expand Up @@ -132,9 +133,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request

log.Info("Data download is accepted")

if dd.Spec.Cancel {
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
return ctrl.Result{}, nil
}

hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name}

// ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration)
Expand All @@ -146,6 +152,12 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
log.Info("Data download is prepared")

if dd.Spec.Cancel {
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
return ctrl.Result{}, nil
}

fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)

if fsRestore != nil {
Expand Down Expand Up @@ -184,7 +196,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
Expand Down Expand Up @@ -271,7 +283,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na

original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
} else {
Expand Down Expand Up @@ -313,9 +325,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dd.Status.StartTimestamp.IsZero() {
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
}
Expand Down Expand Up @@ -382,15 +394,12 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request {
pod := podObj.(*v1.Pod)

dd := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Labels[velerov1api.DataDownloadLabel],
}, dd)

dd, err := r.findDataDownloadByPod(*pod)
if err != nil {
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload")
return []reconcile.Request{}
} else if dd == nil {
return []reconcile.Request{}
}

if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
Expand All @@ -416,6 +425,43 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object)
return requests
}

func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) {
pods := &v1.PodList{}
var dataDownloads []*velerov2alpha1api.DataDownload
if err := cli.List(ctx, pods, &client.ListOptions{LabelSelector: labels.Everything(), Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node")
return nil, errors.Wrapf(err, "failed to list pods on current node")
}

for _, pod := range pods.Items {
dd, err := r.findDataDownloadByPod(pod)
if err != nil {
r.logger.WithError(errors.WithStack(err)).Errorf("failed to get dataDownload by pod %s/%s", pod.Namespace, pod.Name)
continue
} else if dd != nil {
dataDownloads = append(dataDownloads, dd)
}
}
return dataDownloads, nil
}

func (r *DataDownloadReconciler) findDataDownloadByPod(pod v1.Pod) (*velerov2alpha1api.DataDownload, error) {
if label, exist := pod.Labels[velerov1api.DataDownloadLabel]; exist {
dd := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: label,
}, dd)

if err != nil {
return nil, errors.Wrapf(err, "error to find DataDownload by pod %s/%s", pod.Namespace, pod.Name)
}
return dd, nil
}

return nil, nil
}

func (r *DataDownloadReconciler) patchDataDownload(ctx context.Context, req *velerov2alpha1api.DataDownload, mutate func(*velerov2alpha1api.DataDownload)) error {
original := req.DeepCopy()
mutate(req)
Expand All @@ -442,7 +488,7 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = errors.WithMessage(err, msg).Error()
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}

if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataDownload status")
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,14 @@ func TestFindDataDownloadForPod(t *testing.T) {
assert.Equal(t, du.Namespace, requests[0].Namespace)
assert.Equal(t, du.Name, requests[0].Name)
},
}, {
name: "no selected label found for pod",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Result(),
checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Empty(t, requests)
},
}, {
name: "no matched pod",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
Expand All @@ -559,7 +567,7 @@ func TestFindDataDownloadForPod(t *testing.T) {
},
},
{
name: "dataDownload not accepte",
name: "dataDownload not accept",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Result(),
checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) {
Expand Down
Loading

0 comments on commit 138ae93

Please sign in to comment.