From 87b3d3fea45a80925908cb59b6bfdd1584082bac Mon Sep 17 00:00:00 2001 From: Ming Qiu Date: Wed, 5 Jul 2023 15:55:42 +0800 Subject: [PATCH] Mark dataupload datadownload status failed when velero pod restart Signed-off-by: Ming Qiu --- pkg/cmd/cli/nodeagent/server.go | 56 +++++++++++++- pkg/controller/data_download_controller.go | 22 ++++++ .../data_download_controller_test.go | 61 +++++++++++++++ pkg/controller/data_upload_controller.go | 21 +++++ pkg/controller/data_upload_controller_test.go | 76 ++++++++++++++++++- 5 files changed, 228 insertions(+), 8 deletions(-) diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 3f635602ed..52d7e44392 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -224,7 +224,7 @@ func (s *nodeAgentServer) run() { s.metrics.RegisterAllMetrics() s.metrics.InitPodVolumeMetricsForNode(s.nodeName) - s.markInProgressCRsFailed() + s.markCRsFailed() s.logger.Info("Starting controllers") @@ -318,9 +318,9 @@ func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface return nil } -// if there is a restarting during the reconciling of pvbs/pvrs/etc, these CRs may be stuck in progress status -// markInProgressCRsFailed tries to mark the in progress CRs as failed when starting the server to avoid the issue -func (s *nodeAgentServer) markInProgressCRsFailed() { +// if there is a restarting during the reconciling of pvbs/pvrs/etc, these CRs may be stuck in progress/canceling status +// markCRsFailed tries to mark the in progress or canceling CRs as failed when starting the server to avoid the issue +func (s *nodeAgentServer) markCRsFailed() { // the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()}) if err != nil { @@ -331,6 +331,54 @@ func (s *nodeAgentServer) markInProgressCRsFailed() { s.markInProgressPVBsFailed(client) s.markInProgressPVRsFailed(client) + + s.markDataUploadsFailed(client) + + s.markDataDownloadsFailed(client) +} + +func (s *nodeAgentServer) markDataUploadsFailed(client ctrlclient.Client) { + duList := &velerov2alpha1api.DataUploadList{} + if err := client.List(s.ctx, duList, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") + return + } + for i, du := range duList.Items { + if du.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress && du.Status.Phase != velerov2alpha1api.DataUploadPhaseCanceling { + s.logger.Debugf("the status of dataupload %q is %q, skip", du.GetName(), du.Status.Phase) + continue + } + + if err := controller.UpdateDataUploadStatusToFailed(s.ctx, client, &duList.Items[i], + fmt.Sprintf("get a dataupload with status %q during the server starting, mark it as %q", du.Status.Phase, velerov2alpha1api.DataUploadPhaseFailed), + time.Now(), s.logger); err != nil { + s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch dataupload %q", du.GetName()) + continue + } + s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message) + } +} + +func (s *nodeAgentServer) markDataDownloadsFailed(client ctrlclient.Client) { + ddList := &velerov2alpha1api.DataDownloadList{} + if err := client.List(s.ctx, ddList, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads") + return + } + for i, dd := range ddList.Items { + if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseInProgress && dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseCanceling { + s.logger.Debugf("the status of datadownload %q is %q, skip", dd.GetName(), dd.Status.Phase) + continue + } + + if err := controller.UpdateDataDownloadStatusToFailed(s.ctx, client, &ddList.Items[i], + fmt.Sprintf("get a datadownload with status %q during the server starting, mark it as %q", dd.Status.Phase, velerov2alpha1api.DataDownloadPhaseFailed), + time.Now(), s.logger); err != nil { + s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch datadownload %q", dd.GetName()) + continue + } + s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message) + } } func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) { diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 4750cb0edb..dd3a6ccee4 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -491,3 +491,25 @@ func getDataDownloadOwnerObject(dd *velerov2alpha1api.DataDownload) v1.ObjectRef APIVersion: dd.APIVersion, } } + +func UpdateDataDownloadStatusToFailed(ctx context.Context, c client.Client, dd *velerov2alpha1api.DataDownload, msg string, time time.Time, log logrus.FieldLogger) error { + original := dd.DeepCopy() + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed + dd.Status.Message = msg + + if dd.Status.StartTimestamp.IsZero() { + dd.Status.StartTimestamp = &metav1.Time{Time: time} + } + dd.Status.CompletionTimestamp = &metav1.Time{Time: time} + + err := c.Patch(ctx, dd, client.MergeFrom(original)) + if err != nil { + if apierrors.IsConflict(err) { + log.WithField("Datadownload", dd.Name).Info("The status of this datadownload has been updated into failed by others node") + return nil + } + log.WithError(err).Error("error updating Datadownload status") + } + + return err +} diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 773112207a..24774f11c6 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -580,3 +580,64 @@ func TestFindDataDownloadForPod(t *testing.T) { } } } + +func TestUpdateDataDownloadStatusToFailed(t *testing.T) { + ctx := context.TODO() + scheme := runtime.NewScheme() + require.NoError(t, velerov2alpha1api.AddToScheme(scheme)) + tests := []struct { + name string + patchErr bool + patchConflict bool + }{ + { + name: "patch success", + }, + { + name: "patch error", + patchErr: true, + }, + { + name: "patch success", + patchConflict: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeClient := &FakeClient{ + Client: fake.NewClientBuilder().WithScheme(scheme).Build(), + } + if test.patchErr { + fakeClient.patchError = true + } + if test.patchConflict { + fakeClient.patchConflict = true + } + dd := dataDownloadBuilder().Result() + defer func() { + fakeClient.Delete(ctx, dd, &kbclient.DeleteOptions{}) + }() + + require.NoError(t, fakeClient.Create(ctx, dd)) + + msg := "Failed to patch download data status" + log := logrus.New() + err := UpdateDataDownloadStatusToFailed(ctx, fakeClient, dd, msg, time.Now(), log) + + if test.patchErr { + assert.Equal(t, err.Error(), "Patch error") + } else { + assert.NoError(t, err) + } + + if test.patchConflict { + assert.NoError(t, err) + } + + assert.Equal(t, velerov2alpha1api.DataDownloadPhaseFailed, dd.Status.Phase) + assert.Equal(t, msg, dd.Status.Message) + assert.Equal(t, false, dd.Status.StartTimestamp.IsZero()) + assert.Equal(t, false, dd.Status.CompletionTimestamp.IsZero()) + }) + } +} diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 735026cda2..eaad9b0879 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -535,3 +535,24 @@ func getOwnerObject(du *velerov2alpha1api.DataUpload) corev1.ObjectReference { APIVersion: du.APIVersion, } } + +func UpdateDataUploadStatusToFailed(ctx context.Context, c client.Client, du *velerov2alpha1api.DataUpload, msg string, time time.Time, log logrus.FieldLogger) error { + original := du.DeepCopy() + du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed + du.Status.Message = msg + + if du.Status.StartTimestamp.IsZero() { + du.Status.StartTimestamp = &metav1.Time{Time: time} + } + du.Status.CompletionTimestamp = &metav1.Time{Time: time} + + err := c.Patch(ctx, du, client.MergeFrom(original)) + if err != nil { + if apierrors.IsConflict(err) { + log.WithField("Dataupload", du.Name).Info("The status of this dataupload has been updated into failed by others node") + return nil + } + log.WithError(err).Error("error updating DataUpload status") + } + return err +} diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 654e07531f..898c4e79c9 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -24,13 +24,16 @@ import ( snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" snapshotFake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" clientgofake "k8s.io/client-go/kubernetes/fake" "k8s.io/utils/clock" @@ -58,10 +61,11 @@ const fakeSnapshotType velerov2alpha1api.SnapshotType = "fake-snapshot" type FakeClient struct { kbclient.Client - getError bool - createError bool - updateError bool - patchError bool + getError bool + createError bool + updateError bool + patchError bool + patchConflict bool } func (c *FakeClient) Get(ctx context.Context, key kbclient.ObjectKey, obj kbclient.Object) error { @@ -91,6 +95,9 @@ func (c *FakeClient) Update(ctx context.Context, obj kbclient.Object, opts ...kb func (c *FakeClient) Patch(ctx context.Context, obj kbclient.Object, patch kbclient.Patch, opts ...kbclient.PatchOption) error { if c.patchError { return fmt.Errorf("Patch error") + } else if c.patchConflict { + err := errors.New("Conflict error") + return apierrors.NewConflict(schema.GroupResource{Group: obj.GetObjectKind().GroupVersionKind().Group, Resource: ""}, obj.GetName(), err) } return c.Client.Patch(ctx, obj, patch, opts...) @@ -599,3 +606,64 @@ func TestFindDataUploadForPod(t *testing.T) { } } } + +func TestUpdateDataUploadStatusToFailed(t *testing.T) { + ctx := context.TODO() + scheme := runtime.NewScheme() + require.NoError(t, velerov2alpha1api.AddToScheme(scheme)) + tests := []struct { + name string + patchErr bool + patchConflict bool + }{ + { + name: "patch success", + }, + { + name: "patch error", + patchErr: true, + }, + { + name: "patch success", + patchConflict: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeClient := &FakeClient{ + Client: fake.NewClientBuilder().WithScheme(scheme).Build(), + } + if test.patchErr { + fakeClient.patchError = true + } + if test.patchConflict { + fakeClient.patchConflict = true + } + du := dataUploadBuilder().Result() + defer func() { + fakeClient.Delete(ctx, du, &kbclient.DeleteOptions{}) + }() + + require.NoError(t, fakeClient.Create(ctx, du)) + + msg := "Failed to patch download data status" + log := logrus.New() + err := UpdateDataUploadStatusToFailed(ctx, fakeClient, du, msg, time.Now(), log) + + if test.patchErr { + assert.Equal(t, err.Error(), "Patch error") + } else { + assert.NoError(t, err) + } + + if test.patchConflict { + assert.NoError(t, err) + } + + assert.Equal(t, velerov2alpha1api.DataUploadPhaseFailed, du.Status.Phase) + assert.Equal(t, msg, du.Status.Message) + assert.Equal(t, false, du.Status.StartTimestamp.IsZero()) + assert.Equal(t, false, du.Status.CompletionTimestamp.IsZero()) + }) + } +}