diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 2f4a5c78225..4f882a2a7d0 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -163,14 +163,18 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, return err } + req := &csi.NodeStageVolumeRequest{ + ExternalID: vol.RemoteID(), + PublishContext: publishContext, + StagingTargetPath: pluginStagingPath, + VolumeCapability: capability, + Secrets: vol.Secrets, + VolumeContext: vol.Context, + } + // CSI NodeStageVolume errors for timeout, codes.Unavailable and // codes.ResourceExhausted are retried; all other errors are fatal. - return v.plugin.NodeStageVolume(ctx, - vol.RemoteID(), - publishContext, - pluginStagingPath, - capability, - vol.Secrets, + return v.plugin.NodeStageVolume(ctx, req, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), grpc_retry.WithMax(3), grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)), @@ -210,6 +214,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum VolumeCapability: capabilities, Readonly: usage.ReadOnly, Secrets: vol.Secrets, + VolumeContext: vol.Context, }, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), grpc_retry.WithMax(3), diff --git a/plugins/csi/client.go b/plugins/csi/client.go index a8e06c71453..a44e5b1611f 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -12,7 +12,6 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/grpc-middleware/logging" - "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/shared/hclspec" "google.golang.org/grpc" @@ -496,46 +495,33 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error) return result, nil } -func (c *client) NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error { +func (c *client) NodeStageVolume(ctx context.Context, req *NodeStageVolumeRequest, opts ...grpc.CallOption) error { if c == nil { return fmt.Errorf("Client not initialized") } if c.nodeClient == nil { return fmt.Errorf("Client not initialized") } - - // These errors should not be returned during production use but exist as aids - // during Nomad development - if volumeID == "" { - return fmt.Errorf("missing volumeID") - } - if stagingTargetPath == "" { - return fmt.Errorf("missing stagingTargetPath") - } - - req := &csipbv1.NodeStageVolumeRequest{ - VolumeId: volumeID, - PublishContext: publishContext, - StagingTargetPath: stagingTargetPath, - VolumeCapability: capabilities.ToCSIRepresentation(), - Secrets: secrets, + err := req.Validate() + if err != nil { + return err } // NodeStageVolume's response contains no extra data. If err == nil, we were // successful. - _, err := c.nodeClient.NodeStageVolume(ctx, req, opts...) + _, err = c.nodeClient.NodeStageVolume(ctx, req.ToCSIRepresentation(), opts...) if err != nil { code := status.Code(err) switch code { case codes.NotFound: - err = fmt.Errorf("volume %q could not be found: %v", volumeID, err) + err = fmt.Errorf("volume %q could not be found: %v", req.ExternalID, err) case codes.AlreadyExists: err = fmt.Errorf( "volume %q is already staged to %q but with incompatible capabilities for this request: %v", - volumeID, stagingTargetPath, err) + req.ExternalID, req.StagingTargetPath, err) case codes.FailedPrecondition: err = fmt.Errorf("volume %q is already published on another node and does not have MULTI_NODE volume capability: %v", - volumeID, err) + req.ExternalID, err) case codes.Internal: err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err) } diff --git a/plugins/csi/client_test.go b/plugins/csi/client_test.go index 04e574947f5..265d234b83a 100644 --- a/plugins/csi/client_test.go +++ b/plugins/csi/client_test.go @@ -628,8 +628,11 @@ func TestClient_RPC_NodeStageVolume(t *testing.T) { nc.NextErr = tc.ResponseErr nc.NextStageVolumeResponse = tc.Response - err := client.NodeStageVolume(context.TODO(), "foo", nil, "/path", - &VolumeCapability{}, structs.CSISecrets{}) + err := client.NodeStageVolume(context.TODO(), &NodeStageVolumeRequest{ + ExternalID: "foo", + StagingTargetPath: "/path", + VolumeCapability: &VolumeCapability{}, + }) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) } else { diff --git a/plugins/csi/fake/client.go b/plugins/csi/fake/client.go index b54cb144d63..e593dba6f4e 100644 --- a/plugins/csi/fake/client.go +++ b/plugins/csi/fake/client.go @@ -8,7 +8,6 @@ import ( "fmt" "sync" - "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/csi" "github.com/hashicorp/nomad/plugins/shared/hclspec" @@ -192,7 +191,7 @@ func (c *Client) NodeGetInfo(ctx context.Context) (*csi.NodeGetInfoResponse, err // NodeStageVolume is used when a plugin has the STAGE_UNSTAGE volume capability // to prepare a volume for usage on a host. If err == nil, the response should // be assumed to be successful. -func (c *Client) NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *csi.VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error { +func (c *Client) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest, opts ...grpc.CallOption) error { c.Mu.Lock() defer c.Mu.Unlock() diff --git a/plugins/csi/plugin.go b/plugins/csi/plugin.go index 297b63f2293..df68bdce22e 100644 --- a/plugins/csi/plugin.go +++ b/plugins/csi/plugin.go @@ -56,7 +56,7 @@ type CSIPlugin interface { // NodeStageVolume is used when a plugin has the STAGE_UNSTAGE volume capability // to prepare a volume for usage on a host. If err == nil, the response should // be assumed to be successful. - NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error + NodeStageVolume(ctx context.Context, req *NodeStageVolumeRequest, opts ...grpc.CallOption) error // NodeUnstageVolume is used when a plugin has the STAGE_UNSTAGE volume capability // to undo the work performed by NodeStageVolume. If a volume has been staged, @@ -114,6 +114,11 @@ type NodePublishVolumeRequest struct { // Secrets required by plugins to complete the node publish volume // request. This field is OPTIONAL. Secrets structs.CSISecrets + + // Volume context as returned by SP in the CSI + // CreateVolumeResponse.Volume.volume_context which we don't implement but + // can be entered by hand in the volume spec. This field is OPTIONAL. + VolumeContext map[string]string } func (r *NodePublishVolumeRequest) ToCSIRepresentation() *csipbv1.NodePublishVolumeRequest { @@ -129,6 +134,7 @@ func (r *NodePublishVolumeRequest) ToCSIRepresentation() *csipbv1.NodePublishVol VolumeCapability: r.VolumeCapability.ToCSIRepresentation(), Readonly: r.Readonly, Secrets: r.Secrets, + VolumeContext: r.VolumeContext, } } @@ -148,6 +154,69 @@ func (r *NodePublishVolumeRequest) Validate() error { return nil } +type NodeStageVolumeRequest struct { + // The external ID of the volume to stage. + ExternalID string + + // If the volume was attached via a call to `ControllerPublishVolume` then + // we need to provide the returned PublishContext here. + PublishContext map[string]string + + // The path to which the volume MAY be staged. It MUST be an + // absolute path in the root filesystem of the process serving this + // request, and MUST be a directory. The CO SHALL ensure that there + // is only one `staging_target_path` per volume. The CO SHALL ensure + // that the path is directory and that the process serving the + // request has `read` and `write` permission to that directory. The + // CO SHALL be responsible for creating the directory if it does not + // exist. + // This is a REQUIRED field. + StagingTargetPath string + + // Volume capability describing how the CO intends to use this volume. + VolumeCapability *VolumeCapability + + // Secrets required by plugins to complete the node stage volume + // request. This field is OPTIONAL. + Secrets structs.CSISecrets + + // Volume context as returned by SP in the CSI + // CreateVolumeResponse.Volume.volume_context which we don't implement but + // can be entered by hand in the volume spec. This field is OPTIONAL. + VolumeContext map[string]string +} + +func (r *NodeStageVolumeRequest) ToCSIRepresentation() *csipbv1.NodeStageVolumeRequest { + if r == nil { + return nil + } + + return &csipbv1.NodeStageVolumeRequest{ + VolumeId: r.ExternalID, + PublishContext: r.PublishContext, + StagingTargetPath: r.StagingTargetPath, + VolumeCapability: r.VolumeCapability.ToCSIRepresentation(), + Secrets: r.Secrets, + VolumeContext: r.VolumeContext, + } +} + +func (r *NodeStageVolumeRequest) Validate() error { + if r.ExternalID == "" { + return errors.New("missing volume ID") + } + + if r.StagingTargetPath == "" { + return errors.New("missing StagingTargetPath") + } + + if r.VolumeCapability == nil { + return errors.New("missing VolumeCapabilities") + } + + return nil +} + type PluginCapabilitySet struct { hasControllerService bool hasTopologies bool