From 138ae9324d540367ed016e5f4828840b6f3139c3 Mon Sep 17 00:00:00 2001 From: Ming Qiu Date: Wed, 5 Jul 2023 15:55:42 +0800 Subject: [PATCH] Mark dataupload datadownload status failed when velero pod restart Signed-off-by: Ming Qiu --- pkg/cmd/cli/nodeagent/server.go | 52 ++++++++++++- pkg/cmd/server/server.go | 44 +++++++++++ pkg/controller/data_download_controller.go | 74 ++++++++++++++---- .../data_download_controller_test.go | 10 ++- pkg/controller/data_upload_controller.go | 76 +++++++++++++++---- pkg/controller/data_upload_controller_test.go | 14 +++- 6 files changed, 234 insertions(+), 36 deletions(-) diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 3f635602ed..4108eb6d7d 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -256,11 +256,15 @@ func (s *nodeAgentServer) run() { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } - if err = controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger).SetupWithManager(s.mgr); err != nil { + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger) + s.markDataUploadsCancel(s.mgr.GetClient(), dataUploadReconciler) + if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger).SetupWithManager(s.mgr); err != nil { + dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger) + s.markDataDownloadsCancel(s.mgr.GetClient(), dataDownloadReconciler) + if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") } @@ -333,6 +337,50 @@ func (s *nodeAgentServer) markInProgressCRsFailed() { s.markInProgressPVRsFailed(client) } +func (s *nodeAgentServer) markDataUploadsCancel(client ctrlclient.Client, r *controller.DataUploadReconciler) { + if dataUploads, err := r.FindDataUploads(s.ctx, s.mgr.GetClient(), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads") + } else { + for _, du := range dataUploads { + updated := du.DeepCopy() + updated.Spec.Cancel = true + updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase) + du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled + if du.Status.StartTimestamp.IsZero() { + du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} + } + du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(du)); err != nil { + s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", du.GetName()) + continue + } + s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message) + } + } +} + +func (s *nodeAgentServer) markDataDownloadsCancel(client ctrlclient.Client, r *controller.DataDownloadReconciler) { + if dataDownloads, err := r.FindDataDownloads(s.ctx, s.mgr.GetClient(), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads") + } else { + for _, dd := range dataDownloads { + updated := dd.DeepCopy() + updated.Spec.Cancel = true + updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase) + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled + if dd.Status.StartTimestamp.IsZero() { + dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} + } + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(dd)); err != nil { + s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName()) + continue + } + s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message) + } + } +} + func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) { pvbs := &velerov1api.PodVolumeBackupList{} if err := client.List(s.ctx, pvbs, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil { diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 00a4456f0c..771a560411 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -1078,6 +1078,7 @@ func markInProgressBackupsFailed(ctx context.Context, client ctrlclient.Client, continue } log.WithField("backup", backup.GetName()).Warn(updated.Status.FailureReason) + markDataUploadsCancel(ctx, client, backup, log) } } @@ -1101,5 +1102,48 @@ func markInProgressRestoresFailed(ctx context.Context, client ctrlclient.Client, continue } log.WithField("restore", restore.GetName()).Warn(updated.Status.FailureReason) + markDataDownloadsCancel(ctx, client, restore, log) + } +} + +func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup velerov1api.Backup, log logrus.FieldLogger) { + dataUploads := &velerov2alpha1api.DataUploadList{} + + if err := client.List(ctx, dataUploads, &ctrlclient.MatchingLabels{velerov1api.BackupUIDLabel: string(backup.GetUID())}); err != nil { + log.WithError(errors.WithStack(err)).Error("failed to list dataUploads") + return + } + + for i := range dataUploads.Items { + du := dataUploads.Items[i] + updated := du.DeepCopy() + updated.Spec.Cancel = true + updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase) + if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&du)); err != nil { + log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName()) + continue + } + log.WithField("dataupload", du.GetName()).Warn(du.Status.Message) + } +} + +func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, restore velerov1api.Restore, log logrus.FieldLogger) { + dataDownloads := &velerov2alpha1api.DataDownloadList{} + + if err := client.List(ctx, dataDownloads, &ctrlclient.MatchingLabels{velerov1api.RestoreUIDLabel: string(restore.GetUID())}); err != nil { + log.WithError(errors.WithStack(err)).Error("failed to list dataDownloads") + return + } + + for i := range dataDownloads.Items { + dd := dataDownloads.Items[i] + updated := dd.DeepCopy() + updated.Spec.Cancel = true + updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase) + if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&dd)); err != nil { + log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", dd.GetName()) + continue + } + log.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message) } } diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 4750cb0edb..d9d51ad64f 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/utils/clock" @@ -57,7 +58,7 @@ type DataDownloadReconciler struct { logger logrus.FieldLogger credentialGetter *credentials.CredentialGetter fileSystem filesystem.Interface - clock clock.WithTickerAndDelayedExecution + Clock clock.WithTickerAndDelayedExecution restoreExposer exposer.GenericRestoreExposer nodeName string repositoryEnsurer *repository.Ensurer @@ -72,7 +73,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter logger: logger.WithField("controller", "DataDownload"), credentialGetter: credentialGetter, fileSystem: filesystem.NewFileSystem(), - clock: &clock.RealClock{}, + Clock: &clock.RealClock{}, nodeName: nodeName, repositoryEnsurer: repoEnsurer, restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), @@ -132,9 +133,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request log.Info("Data download is accepted") + if dd.Spec.Cancel { + r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName()) + return ctrl.Result{}, nil + } + hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name} - // ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, + // Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, // but the pod maybe is not in the same node of the current controller, so we need to return it here. // And then only the controller who is in the same node could do the rest work. err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration) @@ -146,6 +152,12 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, nil } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { log.Info("Data download is prepared") + + if dd.Spec.Cancel { + r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName()) + return ctrl.Result{}, nil + } + fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name) if fsRestore != nil { @@ -184,7 +196,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request // Update status to InProgress original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress - dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { log.WithError(err).Error("Unable to update status to in progress") return ctrl.Result{}, err @@ -271,7 +283,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted - dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { log.WithError(err).Error("error updating data download status") } else { @@ -313,9 +325,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled if dd.Status.StartTimestamp.IsZero() { - dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } - dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { log.WithError(err).Error("error updating data download status") } @@ -382,15 +394,12 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request { pod := podObj.(*v1.Pod) - dd := &velerov2alpha1api.DataDownload{} - err := r.client.Get(context.Background(), types.NamespacedName{ - Namespace: pod.Namespace, - Name: pod.Labels[velerov1api.DataDownloadLabel], - }, dd) - + dd, err := r.findDataDownloadByPod(*pod) if err != nil { r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload") return []reconcile.Request{} + } else if dd == nil { + return []reconcile.Request{} } if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted { @@ -416,6 +425,43 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) return requests } +func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) { + pods := &v1.PodList{} + var dataDownloads []*velerov2alpha1api.DataDownload + if err := cli.List(ctx, pods, &client.ListOptions{LabelSelector: labels.Everything(), Namespace: ns}); err != nil { + r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node") + return nil, errors.Wrapf(err, "failed to list pods on current node") + } + + for _, pod := range pods.Items { + dd, err := r.findDataDownloadByPod(pod) + if err != nil { + r.logger.WithError(errors.WithStack(err)).Errorf("failed to get dataDownload by pod %s/%s", pod.Namespace, pod.Name) + continue + } else if dd != nil { + dataDownloads = append(dataDownloads, dd) + } + } + return dataDownloads, nil +} + +func (r *DataDownloadReconciler) findDataDownloadByPod(pod v1.Pod) (*velerov2alpha1api.DataDownload, error) { + if label, exist := pod.Labels[velerov1api.DataDownloadLabel]; exist { + dd := &velerov2alpha1api.DataDownload{} + err := r.client.Get(context.Background(), types.NamespacedName{ + Namespace: pod.Namespace, + Name: label, + }, dd) + + if err != nil { + return nil, errors.Wrapf(err, "error to find DataDownload by pod %s/%s", pod.Namespace, pod.Name) + } + return dd, nil + } + + return nil, nil +} + func (r *DataDownloadReconciler) patchDataDownload(ctx context.Context, req *velerov2alpha1api.DataDownload, mutate func(*velerov2alpha1api.DataDownload)) error { original := req.DeepCopy() mutate(req) @@ -442,7 +488,7 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed dd.Status.Message = errors.WithMessage(err, msg).Error() - dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil { log.WithError(patchErr).Error("error updating DataDownload status") diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 773112207a..9aeadd859a 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -550,6 +550,14 @@ func TestFindDataDownloadForPod(t *testing.T) { assert.Equal(t, du.Namespace, requests[0].Namespace) assert.Equal(t, du.Name, requests[0].Name) }, + }, { + name: "no selected label found for pod", + du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Result(), + checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) { + // Assert that the function returns a single request + assert.Empty(t, requests) + }, }, { name: "no matched pod", du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), @@ -559,7 +567,7 @@ func TestFindDataDownloadForPod(t *testing.T) { }, }, { - name: "dataDownload not accepte", + name: "dataDownload not accept", du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(), pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Result(), checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) { diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 735026cda2..ebdb4a90fd 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" clocks "k8s.io/utils/clock" @@ -61,7 +62,7 @@ type DataUploadReconciler struct { kubeClient kubernetes.Interface csiSnapshotClient snapshotter.SnapshotV1Interface repoEnsurer *repository.Ensurer - clock clocks.WithTickerAndDelayedExecution + Clock clocks.WithTickerAndDelayedExecution credentialGetter *credentials.CredentialGetter nodeName string fileSystem filesystem.Interface @@ -77,7 +78,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa client: client, kubeClient: kubeClient, csiSnapshotClient: csiSnapshotClient, - clock: clock, + Clock: clock, credentialGetter: cred, nodeName: nodeName, fileSystem: fs, @@ -134,18 +135,29 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) log.Info("Data upload is accepted") + if du.Spec.Cancel { + r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName()) + return ctrl.Result{}, nil + } + exposeParam := r.setupExposeParam(&du) if err := ep.Expose(ctx, getOwnerObject(&du), exposeParam); err != nil { return r.errorOut(ctx, &du, err, "error to expose snapshot", log) } log.Info("Snapshot is exposed") - // ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, + // Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, // but the pod maybe is not in the same node of the current controller, so we need to return it here. // And then only the controller who is in the same node could do the rest work. return ctrl.Result{}, nil } else if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared { log.Info("Data upload is prepared") + + if du.Spec.Cancel { + r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName()) + return ctrl.Result{}, nil + } + fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) if fsBackup != nil { log.Info("Cancellable data path is already started") @@ -183,7 +195,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Update status to InProgress original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress - du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { return r.errorOut(ctx, &du, err, "error updating dataupload status", log) } @@ -277,7 +289,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp du.Status.Path = result.Backup.Source.ByPath du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted du.Status.SnapshotID = result.Backup.SnapshotID - du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} if result.Backup.EmptySnapshot { du.Status.Message = "volume was empty so no data was upload" } @@ -331,9 +343,9 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled if du.Status.StartTimestamp.IsZero() { - du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } - du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { log.WithError(err).Error("error updating DataUpload status") } @@ -399,16 +411,12 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reconcile.Request { pod := podObj.(*corev1.Pod) - - du := &velerov2alpha1api.DataUpload{} - err := r.client.Get(context.Background(), types.NamespacedName{ - Namespace: pod.Namespace, - Name: pod.Labels[velerov1api.DataUploadLabel], - }, du) - + du, err := r.findDataUploadByPod(*pod) if err != nil { r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("unable to get dataupload") return []reconcile.Request{} + } else if du == nil { + return []reconcile.Request{} } if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted { @@ -430,6 +438,42 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco return []reconcile.Request{requests} } +func (r *DataUploadReconciler) findDataUploadByPod(pod corev1.Pod) (*velerov2alpha1api.DataUpload, error) { + if label, exist := pod.Labels[velerov1api.DataUploadLabel]; exist { + du := &velerov2alpha1api.DataUpload{} + err := r.client.Get(context.Background(), types.NamespacedName{ + Namespace: pod.Namespace, + Name: label, + }, du) + + if err != nil { + return nil, errors.Wrapf(err, "error to find DataUpload by pod %s/%s", pod.Namespace, pod.Name) + } + return du, nil + } + return nil, nil +} + +func (r *DataUploadReconciler) FindDataUploads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataUpload, error) { + pods := &corev1.PodList{} + var dataUploads []*velerov2alpha1api.DataUpload + if err := cli.List(ctx, pods, &client.ListOptions{LabelSelector: labels.Everything(), Namespace: ns}); err != nil { + r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node") + return nil, errors.Wrapf(err, "failed to list pods on current node") + } + + for _, pod := range pods.Items { + dd, err := r.findDataUploadByPod(pod) + if err != nil { + r.logger.WithError(errors.WithStack(err)).Errorf("failed to get dataUpload by pod %s/%s", pod.Namespace, pod.Name) + continue + } else if dd != nil { + dataUploads = append(dataUploads, dd) + } + } + return dataUploads, nil +} + func (r *DataUploadReconciler) patchDataUpload(ctx context.Context, req *velerov2alpha1api.DataUpload, mutate func(*velerov2alpha1api.DataUpload)) error { original := req.DeepCopy() mutate(req) @@ -462,10 +506,10 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed du.Status.Message = errors.WithMessage(err, msg).Error() if du.Status.StartTimestamp.IsZero() { - du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } - du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil { log.WithError(patchErr).Error("error updating DataUpload status") } diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 654e07531f..503c930f4f 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -334,7 +334,7 @@ func TestReconcile(t *testing.T) { name: "runCancelableDataUpload is concurrent limited", dataMgr: datapath.NewManager(0), pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Cancel(true).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), expectedProcessed: false, expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, @@ -369,7 +369,7 @@ func TestReconcile(t *testing.T) { } if test.du.Spec.SnapshotType == fakeSnapshotType { - r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.clock}} + r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock}} } else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())} } @@ -378,7 +378,7 @@ func TestReconcile(t *testing.T) { return &fakeDataUploadFSBR{ du: test.du, kubeClient: r.client, - clock: r.clock, + clock: r.Clock, } } @@ -569,6 +569,14 @@ func TestFindDataUploadForPod(t *testing.T) { assert.Equal(t, du.Namespace, requests[0].Namespace) assert.Equal(t, du.Name, requests[0].Name) }, + }, { + name: "no selected label found for pod", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Result(), + checkFunc: func(du *velerov2alpha1api.DataUpload, requests []reconcile.Request) { + // Assert that the function returns a single request + assert.Empty(t, requests) + }, }, { name: "no matched pod", du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),