Skip to content

Commit

Permalink
Mark dataupload datadownload status failed when velero pod restart
Browse files Browse the repository at this point in the history
Signed-off-by: Ming Qiu <mqiu@vmware.com>
  • Loading branch information
qiuming-best committed Jul 5, 2023
1 parent e54a8af commit 87b3d3f
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 8 deletions.
56 changes: 52 additions & 4 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *nodeAgentServer) run() {
s.metrics.RegisterAllMetrics()
s.metrics.InitPodVolumeMetricsForNode(s.nodeName)

s.markInProgressCRsFailed()
s.markCRsFailed()

s.logger.Info("Starting controllers")

Expand Down Expand Up @@ -318,9 +318,9 @@ func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface
return nil
}

// if there is a restarting during the reconciling of pvbs/pvrs/etc, these CRs may be stuck in progress status
// markInProgressCRsFailed tries to mark the in progress CRs as failed when starting the server to avoid the issue
func (s *nodeAgentServer) markInProgressCRsFailed() {
// if there is a restarting during the reconciling of pvbs/pvrs/etc, these CRs may be stuck in progress/canceling status
// markCRsFailed tries to mark the in progress or canceling CRs as failed when starting the server to avoid the issue
func (s *nodeAgentServer) markCRsFailed() {
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
Expand All @@ -331,6 +331,54 @@ func (s *nodeAgentServer) markInProgressCRsFailed() {
s.markInProgressPVBsFailed(client)

s.markInProgressPVRsFailed(client)

s.markDataUploadsFailed(client)

s.markDataDownloadsFailed(client)
}

func (s *nodeAgentServer) markDataUploadsFailed(client ctrlclient.Client) {
duList := &velerov2alpha1api.DataUploadList{}
if err := client.List(s.ctx, duList, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
return
}
for i, du := range duList.Items {
if du.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress && du.Status.Phase != velerov2alpha1api.DataUploadPhaseCanceling {
s.logger.Debugf("the status of dataupload %q is %q, skip", du.GetName(), du.Status.Phase)
continue
}

if err := controller.UpdateDataUploadStatusToFailed(s.ctx, client, &duList.Items[i],
fmt.Sprintf("get a dataupload with status %q during the server starting, mark it as %q", du.Status.Phase, velerov2alpha1api.DataUploadPhaseFailed),
time.Now(), s.logger); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch dataupload %q", du.GetName())
continue
}
s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
}
}

func (s *nodeAgentServer) markDataDownloadsFailed(client ctrlclient.Client) {
ddList := &velerov2alpha1api.DataDownloadList{}
if err := client.List(s.ctx, ddList, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads")
return
}
for i, dd := range ddList.Items {
if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseInProgress && dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseCanceling {
s.logger.Debugf("the status of datadownload %q is %q, skip", dd.GetName(), dd.Status.Phase)
continue
}

if err := controller.UpdateDataDownloadStatusToFailed(s.ctx, client, &ddList.Items[i],
fmt.Sprintf("get a datadownload with status %q during the server starting, mark it as %q", dd.Status.Phase, velerov2alpha1api.DataDownloadPhaseFailed),
time.Now(), s.logger); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch datadownload %q", dd.GetName())
continue
}
s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
}
}

func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
Expand Down
22 changes: 22 additions & 0 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,3 +491,25 @@ func getDataDownloadOwnerObject(dd *velerov2alpha1api.DataDownload) v1.ObjectRef
APIVersion: dd.APIVersion,
}
}

func UpdateDataDownloadStatusToFailed(ctx context.Context, c client.Client, dd *velerov2alpha1api.DataDownload, msg string, time time.Time, log logrus.FieldLogger) error {
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = msg

if dd.Status.StartTimestamp.IsZero() {
dd.Status.StartTimestamp = &metav1.Time{Time: time}
}
dd.Status.CompletionTimestamp = &metav1.Time{Time: time}

err := c.Patch(ctx, dd, client.MergeFrom(original))
if err != nil {
if apierrors.IsConflict(err) {
log.WithField("Datadownload", dd.Name).Info("The status of this datadownload has been updated into failed by others node")
return nil
}
log.WithError(err).Error("error updating Datadownload status")
}

return err
}
61 changes: 61 additions & 0 deletions pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,3 +580,64 @@ func TestFindDataDownloadForPod(t *testing.T) {
}
}
}

func TestUpdateDataDownloadStatusToFailed(t *testing.T) {
ctx := context.TODO()
scheme := runtime.NewScheme()
require.NoError(t, velerov2alpha1api.AddToScheme(scheme))
tests := []struct {
name string
patchErr bool
patchConflict bool
}{
{
name: "patch success",
},
{
name: "patch error",
patchErr: true,
},
{
name: "patch success",
patchConflict: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeClient := &FakeClient{
Client: fake.NewClientBuilder().WithScheme(scheme).Build(),
}
if test.patchErr {
fakeClient.patchError = true
}
if test.patchConflict {
fakeClient.patchConflict = true
}
dd := dataDownloadBuilder().Result()
defer func() {
fakeClient.Delete(ctx, dd, &kbclient.DeleteOptions{})
}()

require.NoError(t, fakeClient.Create(ctx, dd))

msg := "Failed to patch download data status"
log := logrus.New()
err := UpdateDataDownloadStatusToFailed(ctx, fakeClient, dd, msg, time.Now(), log)

if test.patchErr {
assert.Equal(t, err.Error(), "Patch error")
} else {
assert.NoError(t, err)
}

if test.patchConflict {
assert.NoError(t, err)
}

assert.Equal(t, velerov2alpha1api.DataDownloadPhaseFailed, dd.Status.Phase)
assert.Equal(t, msg, dd.Status.Message)
assert.Equal(t, false, dd.Status.StartTimestamp.IsZero())
assert.Equal(t, false, dd.Status.CompletionTimestamp.IsZero())
})
}
}
21 changes: 21 additions & 0 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,24 @@ func getOwnerObject(du *velerov2alpha1api.DataUpload) corev1.ObjectReference {
APIVersion: du.APIVersion,
}
}

func UpdateDataUploadStatusToFailed(ctx context.Context, c client.Client, du *velerov2alpha1api.DataUpload, msg string, time time.Time, log logrus.FieldLogger) error {
original := du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed
du.Status.Message = msg

if du.Status.StartTimestamp.IsZero() {
du.Status.StartTimestamp = &metav1.Time{Time: time}
}
du.Status.CompletionTimestamp = &metav1.Time{Time: time}

err := c.Patch(ctx, du, client.MergeFrom(original))
if err != nil {
if apierrors.IsConflict(err) {
log.WithField("Dataupload", du.Name).Info("The status of this dataupload has been updated into failed by others node")
return nil
}
log.WithError(err).Error("error updating DataUpload status")
}
return err
}
76 changes: 72 additions & 4 deletions pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (

snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotFake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
clientgofake "k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -58,10 +61,11 @@ const fakeSnapshotType velerov2alpha1api.SnapshotType = "fake-snapshot"

type FakeClient struct {
kbclient.Client
getError bool
createError bool
updateError bool
patchError bool
getError bool
createError bool
updateError bool
patchError bool
patchConflict bool
}

func (c *FakeClient) Get(ctx context.Context, key kbclient.ObjectKey, obj kbclient.Object) error {
Expand Down Expand Up @@ -91,6 +95,9 @@ func (c *FakeClient) Update(ctx context.Context, obj kbclient.Object, opts ...kb
func (c *FakeClient) Patch(ctx context.Context, obj kbclient.Object, patch kbclient.Patch, opts ...kbclient.PatchOption) error {
if c.patchError {
return fmt.Errorf("Patch error")
} else if c.patchConflict {
err := errors.New("Conflict error")
return apierrors.NewConflict(schema.GroupResource{Group: obj.GetObjectKind().GroupVersionKind().Group, Resource: ""}, obj.GetName(), err)
}

return c.Client.Patch(ctx, obj, patch, opts...)
Expand Down Expand Up @@ -599,3 +606,64 @@ func TestFindDataUploadForPod(t *testing.T) {
}
}
}

func TestUpdateDataUploadStatusToFailed(t *testing.T) {
ctx := context.TODO()
scheme := runtime.NewScheme()
require.NoError(t, velerov2alpha1api.AddToScheme(scheme))
tests := []struct {
name string
patchErr bool
patchConflict bool
}{
{
name: "patch success",
},
{
name: "patch error",
patchErr: true,
},
{
name: "patch success",
patchConflict: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeClient := &FakeClient{
Client: fake.NewClientBuilder().WithScheme(scheme).Build(),
}
if test.patchErr {
fakeClient.patchError = true
}
if test.patchConflict {
fakeClient.patchConflict = true
}
du := dataUploadBuilder().Result()
defer func() {
fakeClient.Delete(ctx, du, &kbclient.DeleteOptions{})
}()

require.NoError(t, fakeClient.Create(ctx, du))

msg := "Failed to patch download data status"
log := logrus.New()
err := UpdateDataUploadStatusToFailed(ctx, fakeClient, du, msg, time.Now(), log)

if test.patchErr {
assert.Equal(t, err.Error(), "Patch error")
} else {
assert.NoError(t, err)
}

if test.patchConflict {
assert.NoError(t, err)
}

assert.Equal(t, velerov2alpha1api.DataUploadPhaseFailed, du.Status.Phase)
assert.Equal(t, msg, du.Status.Message)
assert.Equal(t, false, du.Status.StartTimestamp.IsZero())
assert.Equal(t, false, du.Status.CompletionTimestamp.IsZero())
})
}
}

0 comments on commit 87b3d3f

Please sign in to comment.