Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AnnVolumeSnapshotBeingCreated #261

Merged
merged 10 commits into from
Apr 3, 2020
14 changes: 8 additions & 6 deletions pkg/sidecar-controller/content_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ func TestSyncContent(t *testing.T) {

tests := []controllerTest{
{
name: "1-1: Basic content update ready to use",
initialContents: newContentArrayWithReadyToUse("content1-1", "snapuid1-1", "snap1-1", "sid1-1", defaultClass, "", "volume-handle-1-1", retainPolicy, nil, &defaultSize, &False, true),
expectedContents: newContentArrayWithReadyToUse("content1-1", "snapuid1-1", "snap1-1", "sid1-1", defaultClass, "", "volume-handle-1-1", retainPolicy, nil, &defaultSize, &True, true),
expectedEvents: noevents,
name: "1-1: Basic content update ready to use",
initialContents: newContentArrayWithReadyToUse("content1-1", "snapuid1-1", "snap1-1", "sid1-1", defaultClass, "", "volume-handle-1-1", retainPolicy, nil, &defaultSize, &False, true),
expectedContents: withContentAnnotations(newContentArrayWithReadyToUse("content1-1", "snapuid1-1", "snap1-1", "sid1-1", defaultClass, "", "volume-handle-1-1", retainPolicy, nil, &defaultSize, &True, true),
map[string]string{}),
expectedEvents: noevents,
expectedCreateCalls: []createCall{
{
volumeHandle: "volume-handle-1-1",
Expand All @@ -52,8 +53,9 @@ func TestSyncContent(t *testing.T) {
name: "1-2: Basic sync content create snapshot",
initialContents: withContentStatus(newContentArray("content1-2", "snapuid1-2", "snap1-2", "sid1-2", defaultClass, "", "volume-handle-1-2", retainPolicy, nil, &defaultSize, true),
nil),
expectedContents: withContentStatus(newContentArray("content1-2", "snapuid1-2", "snap1-2", "sid1-2", defaultClass, "", "volume-handle-1-2", retainPolicy, nil, &defaultSize, true),
expectedContents: withContentAnnotations(withContentStatus(newContentArray("content1-2", "snapuid1-2", "snap1-2", "sid1-2", defaultClass, "", "volume-handle-1-2", retainPolicy, nil, &defaultSize, true),
&crdv1.VolumeSnapshotContentStatus{SnapshotHandle: toStringPointer("snapuid1-2"), RestoreSize: &defaultSize, ReadyToUse: &True}),
map[string]string{}),
expectedEvents: noevents,
expectedCreateCalls: []createCall{
{
Expand Down Expand Up @@ -161,7 +163,7 @@ func TestSyncContent(t *testing.T) {
SnapshotHandle: toStringPointer("sid1-6"),
RestoreSize: &defaultSize,
ReadyToUse: &False,
Error: newSnapshotError("Failed to check and update snapshot content: failed to get input parameters to create snapshot content1-6: \"failed to retrieve snapshot class bad-class from the informer: \\\"volumesnapshotclass.snapshot.storage.k8s.io \\\\\\\"bad-class\\\\\\\" not found\\\"\""),
Error: newSnapshotError("Failed to check and update snapshot content: failed to get input parameters to create snapshot for content content1-6: \"failed to retrieve snapshot class bad-class from the informer: \\\"volumesnapshotclass.snapshot.storage.k8s.io \\\\\\\"bad-class\\\\\\\" not found\\\"\""),
}),
expectedEvents: []string{"Warning SnapshotContentCheckandUpdateFailed"},
expectedCreateCalls: []createCall{
Expand Down
200 changes: 155 additions & 45 deletions pkg/sidecar-controller/snapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

crdv1 "github.com/kubernetes-csi/external-snapshotter/v2/pkg/apis/volumesnapshot/v1beta1"
"github.com/kubernetes-csi/external-snapshotter/v2/pkg/utils"
codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
Expand Down Expand Up @@ -81,6 +83,11 @@ func (ctrl *csiSnapshotSideCarController) syncContent(content *crdv1.VolumeSnaps
// or ListSnapshots CSI methods over and over again for
// performance reasons.
if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true {
// Try to remove AnnVolumeSnapshotBeingCreated if it is not removed yet for some reason
err := ctrl.removeAnnVolumeSnapshotBeingCreated(content)
if err != nil {
return fmt.Errorf("failed to remove VolumeSnapshotBeingCreated annotation from the content %s: %q", content.Name, err)
}
return nil
}
ctrl.checkandUpdateContentStatus(content)
Expand Down Expand Up @@ -126,10 +133,17 @@ func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSn
klog.V(5).Infof("createSnapshot for content [%s]: started", content.Name)
opName := fmt.Sprintf("create-%s", content.Name)
xing-yang marked this conversation as resolved.
Show resolved Hide resolved
ctrl.scheduleOperation(opName, func() error {
contentObj, err := ctrl.createSnapshotOperation(content)
// content.Status will be created for the first time after a snapshot
// is created by the CSI driver. If content.Status is not nil,
// we should update content status without creating snapshot again.
if content.Status != nil && content.Status.Error != nil && content.Status.Error.Message != nil && !isControllerUpdateFailError(content.Status.Error) {
klog.V(4).Infof("error is already set in snapshot, do not retry to create: %s", *content.Status.Error.Message)
return nil
}
contentObj, err := ctrl.createSnapshotWrapper(content)
if err != nil {
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotOperation: %v", opName, err)
klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotWrapper: %v", opName, err)
return err
}

Expand Down Expand Up @@ -276,75 +290,80 @@ func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatusOperation(c
}
driverName = content.Spec.Driver
snapshotID = *content.Spec.Source.SnapshotHandle
} else {
class, snapshotterCredentials, err := ctrl.getCSISnapshotInput(content)
if err != nil {
return nil, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", content.Name, err)

klog.V(5).Infof("checkandUpdateContentStatusOperation: driver %s, snapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, snapshotID, creationTime, size, readyToUse)

if creationTime.IsZero() {
creationTime = time.Now()
}

driverName, snapshotID, creationTime, size, readyToUse, err = ctrl.handler.CreateSnapshot(content, class.Parameters, snapshotterCredentials)
updatedContent, err := ctrl.updateSnapshotContentStatus(content, snapshotID, readyToUse, creationTime.UnixNano(), size)
if err != nil {
klog.Errorf("checkandUpdateContentStatusOperation: failed to call create snapshot to check whether the snapshot is ready to use %q", err)
return nil, err
}
return updatedContent, nil
} else {
return ctrl.createSnapshotWrapper(content)
}
klog.V(5).Infof("checkandUpdateContentStatusOperation: driver %s, snapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, snapshotID, creationTime, size, readyToUse)

if creationTime.IsZero() {
creationTime = time.Now()
}

updateContent, err := ctrl.updateSnapshotContentStatus(content, snapshotID, readyToUse, creationTime.UnixNano(), size)
if err != nil {
return nil, err
}
return updateContent, nil
}

// The function goes through the whole snapshot creation process.
// 1. Trigger the snapshot through csi storage provider.
// 2. Update VolumeSnapshot status with creationtimestamp information
// 3. Create the VolumeSnapshotContent object with the snapshot id information.
// 4. Bind the VolumeSnapshot and VolumeSnapshotContent object
func (ctrl *csiSnapshotSideCarController) createSnapshotOperation(content *crdv1.VolumeSnapshotContent) (*crdv1.VolumeSnapshotContent, error) {
klog.Infof("createSnapshotOperation: Creating snapshot for content %s through the plugin ...", content.Name)

// content.Status will be created for the first time after a snapshot
// is created by the CSI driver. If content.Status is not nil,
// we should update content status without creating snapshot again.
if content.Status != nil && content.Status.Error != nil && content.Status.Error.Message != nil && !isControllerUpdateFailError(content.Status.Error) {
klog.V(4).Infof("error is already set in snapshot, do not retry to create: %s", *content.Status.Error.Message)
return content, nil
}
// This is a wrapper function for the snapshot creation process.
func (ctrl *csiSnapshotSideCarController) createSnapshotWrapper(content *crdv1.VolumeSnapshotContent) (*crdv1.VolumeSnapshotContent, error) {
klog.Infof("createSnapshotWrapper: Creating snapshot for content %s through the plugin ...", content.Name)

xing-yang marked this conversation as resolved.
Show resolved Hide resolved
class, snapshotterCredentials, err := ctrl.getCSISnapshotInput(content)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added error status check in createSnapshotWrapper. Can you check?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "https://github.com/kubernetes-csi/external-snapshotter/pull/261/files#r397544373" does not seem to be the spot that you are referring to. My understanding is that you want me to add error status check in createSnapshotWrapper. If it is not the case, please clarify.

if err != nil {
return nil, fmt.Errorf("failed to get input parameters to create snapshot for content %s: %q", content.Name, err)
}

// NOTE(xyang): handle create timeout
// Add an annotation to indicate the snapshot creation request has been
// sent to the storage system and the controller is waiting for a response.
// The annotation will be removed after the storage system has responded with
// success or permanent failure. If the request times out, annotation will
// remain on the content to avoid potential leaking of a snapshot resource on
// the storage system.
err = ctrl.setAnnVolumeSnapshotBeingCreated(content)
xing-yang marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("failed to add VolumeSnapshotBeingCreated annotation on the content %s: %q", content.Name, err)
}

driverName, snapshotID, creationTime, size, readyToUse, err := ctrl.handler.CreateSnapshot(content, class.Parameters, snapshotterCredentials)
if err != nil {
// NOTE(xyang): handle create timeout
// If it is a final error, remove annotation to indicate
// storage system has responded with an error
klog.Infof("createSnapshotWrapper: CreateSnapshot for content %s returned error: %v", content.Name, err)
if isCSIFinalError(err) {
err = ctrl.removeAnnVolumeSnapshotBeingCreated(content)
if err != nil {
return nil, fmt.Errorf("failed to remove VolumeSnapshotBeingCreated annotation from the content %s: %q", content.Name, err)
}
}

return nil, fmt.Errorf("failed to take snapshot of the volume, %s: %q", *content.Spec.Source.VolumeHandle, err)
xing-yang marked this conversation as resolved.
Show resolved Hide resolved
}
if driverName != class.Driver {
return nil, fmt.Errorf("failed to take snapshot of the volume, %s: driver name %s returned from the driver is different from driver %s in snapshot class", *content.Spec.Source.VolumeHandle, driverName, class.Driver)
}

klog.V(5).Infof("Created snapshot: driver %s, snapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, snapshotID, creationTime, size, readyToUse)

timestamp := creationTime.UnixNano()
newContent, err := ctrl.updateSnapshotContentStatus(content, snapshotID, readyToUse, timestamp, size)
if creationTime.IsZero() {
creationTime = time.Now()
}

newContent, err := ctrl.updateSnapshotContentStatus(content, snapshotID, readyToUse, creationTime.UnixNano(), size)
if err != nil {
strerr := fmt.Sprintf("error updating volume snapshot content status for snapshot %s: %v.", content.Name, err)
klog.Error(strerr)
klog.Errorf("error updating status for volume snapshot content %s: %v.", content.Name, err)
return nil, fmt.Errorf("error updating status for volume snapshot content %s: %v.", content.Name, err)
} else {
content = newContent
}

// Update content in the cache store
_, err = ctrl.storeContentUpdate(content)
// NOTE(xyang): handle create timeout
// Remove annotation to indicate storage system has successfully
// cut the snapshot
err = ctrl.removeAnnVolumeSnapshotBeingCreated(content)
xing-yang marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
klog.Errorf("failed to update content store %v", err)
return nil, fmt.Errorf("failed to remove VolumeSnapshotBeingCreated annotation on the content %s: %q", content.Name, err)
}

return content, nil
Expand Down Expand Up @@ -564,9 +583,100 @@ func (ctrl *csiSnapshotSideCarController) shouldDelete(content *crdv1.VolumeSnap
if content.Spec.Source.SnapshotHandle != nil && content.Spec.VolumeSnapshotRef.UID == "" {
return true
}
// 2) shouldDelete returns true if AnnVolumeSnapshotBeingDeleted annotation is set

// NOTE(xyang): Handle create snapshot timeout
// 2) shouldDelete returns false if AnnVolumeSnapshotBeingCreated
// annotation is set. This indicates a CreateSnapshot CSI RPC has
// not responded with success or failure.
// We need to keep waiting for a response from the CSI driver.
if metav1.HasAnnotation(content.ObjectMeta, utils.AnnVolumeSnapshotBeingCreated) {
xing-yang marked this conversation as resolved.
Show resolved Hide resolved
return false
}

// 3) shouldDelete returns true if AnnVolumeSnapshotBeingDeleted annotation is set
if metav1.HasAnnotation(content.ObjectMeta, utils.AnnVolumeSnapshotBeingDeleted) {
return true
}
return false
}

// setAnnVolumeSnapshotBeingCreated sets VolumeSnapshotBeingCreated annotation
// on VolumeSnapshotContent
// If set, it indicates snapshot is being created
func (ctrl *csiSnapshotSideCarController) setAnnVolumeSnapshotBeingCreated(content *crdv1.VolumeSnapshotContent) error {
if metav1.HasAnnotation(content.ObjectMeta, utils.AnnVolumeSnapshotBeingCreated) {
// the annotation already exists, return directly
return nil
}

// Set AnnVolumeSnapshotBeingCreated
klog.V(5).Infof("setAnnVolumeSnapshotBeingCreated: set annotation [%s:yes] on content [%s].", utils.AnnVolumeSnapshotBeingCreated, content.Name)
contentClone := content.DeepCopy()
metav1.SetMetaDataAnnotation(&contentClone.ObjectMeta, utils.AnnVolumeSnapshotBeingCreated, "yes")

updatedContent, err := ctrl.clientset.SnapshotV1beta1().VolumeSnapshotContents().Update(contentClone)
if err != nil {
return newControllerUpdateError(content.Name, err.Error())
}
// update content if update is successful
content = updatedContent

_, err = ctrl.storeContentUpdate(content)
if err != nil {
klog.V(4).Infof("setAnnVolumeSnapshotBeingCreated for content [%s]: cannot update internal cache %v", content.Name, err)
return err
xing-yang marked this conversation as resolved.
Show resolved Hide resolved
}
klog.V(5).Infof("setAnnVolumeSnapshotBeingCreated: volume snapshot content %+v", content)

return nil
}

// removeAnnVolumeSnapshotBeingCreated removes the VolumeSnapshotBeingCreated
// annotation from a content if there exists one.
func (ctrl csiSnapshotSideCarController) removeAnnVolumeSnapshotBeingCreated(content *crdv1.VolumeSnapshotContent) error {
if !metav1.HasAnnotation(content.ObjectMeta, utils.AnnVolumeSnapshotBeingCreated) {
// the annotation does not exist, return directly
return nil
}
contentClone := content.DeepCopy()
delete(contentClone.ObjectMeta.Annotations, utils.AnnVolumeSnapshotBeingCreated)

updatedContent, err := ctrl.clientset.SnapshotV1beta1().VolumeSnapshotContents().Update(contentClone)
if err != nil {
return newControllerUpdateError(content.Name, err.Error())
}
// update content if update is successful
content = updatedContent

klog.V(5).Infof("Removed VolumeSnapshotBeingCreated annotation from volume snapshot content %s", content.Name)
_, err = ctrl.storeContentUpdate(content)
if err != nil {
klog.Errorf("failed to update content store %v", err)
}
return nil
}

// This function checks if the error is final
func isCSIFinalError(err error) bool {
// Sources:
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
// https://github.com/container-storage-interface/spec/blob/master/spec.md
st, ok := status.FromError(err)
if !ok {
// This is not gRPC error. The operation must have failed before gRPC
// method was called, otherwise we would get gRPC error.
// We don't know if any previous CreateSnapshot is in progress, be on the safe side.
return false
}
switch st.Code() {
case codes.Canceled, // gRPC: Client Application cancelled the request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the documented RPC error code here
It looks only three types have been documented by CSI spec. ResourceExhausted, Aborted -> may retry later, Already_Existed -> should fail.
are we sure the rest are supported? If yes, should the CSI spec be fixed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSI spec only documents error codes directly returned by the CSI driver. Error codes such as DeadlineExceeded comes from the gRPC layer when timeout happens, not directly returned by the CSI driver. That's why not all gRPC error codes are documented in CSI spec.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are concerns regarding the gRPC error codes being re-used in the CSI spec. See issue here: container-storage-interface/spec#419. That is a bigger design issue. We can modify these error codes when they are changed in the CSI spec.

codes.DeadlineExceeded, // gRPC: Timeout
codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous CreateSnapshot() may be still in progress.
codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous CreateSnapshot() may be still in progress.
codes.Aborted: // CSI: Operation pending for Snapshot
return false
}
// All other errors mean that creating snapshot either did not
// even start or failed. It is for sure not in progress.
return true
}
13 changes: 13 additions & 0 deletions pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ const (
// backing the snapshot content.
AnnVolumeSnapshotBeingDeleted = "snapshot.storage.kubernetes.io/volumesnapshot-being-deleted"

// AnnVolumeSnapshotBeingCreated annotation applies to VolumeSnapshotContents.
// If it is set, it indicates that the csi-snapshotter
// sidecar has sent the create snapshot request to the storage system and
// is waiting for a response of success or failure.
// This annotation will be removed once the driver's CreateSnapshot
// CSI function returns success or a final error (determined by isFinalError()).
// If the create snapshot request fails with a non-final error such as timeout,
// retry will happen and the annotation will remain.
// This only applies to dynamic provisioning of snapshots because
// the create snapshot CSI method will not be called for pre-provisioned
// snapshots.
AnnVolumeSnapshotBeingCreated = "snapshot.storage.kubernetes.io/volumesnapshot-being-created"
xing-yang marked this conversation as resolved.
Show resolved Hide resolved

// Annotation for secret name and namespace will be added to the content
// and used at snapshot content deletion time.
AnnDeletionSecretRefName = "snapshot.storage.kubernetes.io/deletion-secret-name"
Expand Down