Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(CSI-211): support new API paths nodes->processes as per cluster … #247

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/wekafs/apiclient/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,8 @@ type ApiResponse struct {
// ApiObject generic interface of API object of any type (FileSystem, Quota, etc.)
type ApiObject interface {
GetType() string
GetBasePath() string
GetApiUrl() string
GetBasePath(a *ApiClient) string
GetApiUrl(a *ApiClient) string
EQ(other ApiObject) bool
getImmutableFields() []string
String() string
Expand All @@ -569,7 +569,7 @@ type ApiObjectRequest interface {
getRequiredFields() []string
hasRequiredFields() bool
getRelatedObject() ApiObject
getApiUrl() string
getApiUrl(a *ApiClient) string
String() string
}

Expand Down
1 change: 1 addition & 0 deletions pkg/wekafs/apiclient/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (a *ApiClient) fetchClusterInfo(ctx context.Context) error {
logger.Info().Msg(fmt.Sprintf("Cluster compatibility for new filesystem from snapshot: %t", a.SupportsNewFileSystemFromSnapshot()))
logger.Info().Msg(fmt.Sprintf("Cluster compatibility for cloning filesystems: %t", a.SupportsFilesystemCloning()))
logger.Info().Msg(fmt.Sprintf("Cluster compatibility for supporting multiple connections: %t", a.SupportsMultipleClusters()))
logger.Info().Msg(fmt.Sprintf("Cluster requires using new API path for nodes (nodes->processes): %t", a.RequiresNewNodePath()))
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/wekafs/apiclient/compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type WekaCompatibilityRequiredVersions struct {
UrlQueryParams string
SyncOnCloseMountOption string
SingleClientMultipleClusters string
NewNodeApiObjectPath string
}

var MinimumSupportedWekaVersions = &WekaCompatibilityRequiredVersions{
Expand All @@ -31,6 +32,7 @@ var MinimumSupportedWekaVersions = &WekaCompatibilityRequiredVersions{
UrlQueryParams: "v4.0", // can perform URL query by fields
SyncOnCloseMountOption: "v4.2", // can perform sync_on_close mount option
SingleClientMultipleClusters: "v4.2", // single client can have multiple Weka cluster connections
NewNodeApiObjectPath: "v4.2", // new API object paths (processes, containers, etc.)
}

type WekaCompatibilityMap struct {
Expand All @@ -45,6 +47,7 @@ type WekaCompatibilityMap struct {
UrlQueryParams bool
SyncOnCloseMountOption bool
SingleClientMultipleClusters bool
NewNodeApiObjectPath bool
}

func (cm *WekaCompatibilityMap) fillIn(versionStr string) {
Expand All @@ -62,6 +65,7 @@ func (cm *WekaCompatibilityMap) fillIn(versionStr string) {
cm.UrlQueryParams = false
cm.SyncOnCloseMountOption = false
cm.SingleClientMultipleClusters = false
cm.NewNodeApiObjectPath = false
return
}
d, _ := version.NewVersion(MinimumSupportedWekaVersions.DirectoryAsCSIVolume)
Expand All @@ -75,6 +79,7 @@ func (cm *WekaCompatibilityMap) fillIn(versionStr string) {
u, _ := version.NewVersion(MinimumSupportedWekaVersions.UrlQueryParams)
sc, _ := version.NewVersion(MinimumSupportedWekaVersions.SyncOnCloseMountOption)
mc, _ := version.NewVersion(MinimumSupportedWekaVersions.SingleClientMultipleClusters)
nn, _ := version.NewVersion(MinimumSupportedWekaVersions.NewNodeApiObjectPath)

cm.DirectoryAsCSIVolume = v.GreaterThanOrEqual(d)
cm.FilesystemAsCSIVolume = v.GreaterThanOrEqual(f)
Expand All @@ -87,6 +92,7 @@ func (cm *WekaCompatibilityMap) fillIn(versionStr string) {
cm.UrlQueryParams = v.GreaterThanOrEqual(u)
cm.SyncOnCloseMountOption = v.GreaterThanOrEqual(sc)
cm.SingleClientMultipleClusters = v.GreaterThanOrEqual(mc)
cm.NewNodeApiObjectPath = v.GreaterThanOrEqual(nn)
}

func (a *ApiClient) SupportsQuotaDirectoryAsVolume() bool {
Expand Down Expand Up @@ -132,3 +138,7 @@ func (a *ApiClient) SupportsSyncOnCloseMountOption() bool {
func (a *ApiClient) SupportsMultipleClusters() bool {
return a.CompatibilityMap.SingleClientMultipleClusters
}

func (a *ApiClient) RequiresNewNodePath() bool {
return a.CompatibilityMap.NewNodeApiObjectPath
}
34 changes: 17 additions & 17 deletions pkg/wekafs/apiclient/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (a *ApiClient) GetFileSystemByUid(ctx context.Context, uid uuid.UUID, fs *F
ret := &FileSystem{
Uid: uid,
}
err := a.Get(ctx, ret.GetApiUrl(), nil, fs)
err := a.Get(ctx, ret.GetApiUrl(a), nil, fs)
if err != nil {
switch t := err.(type) {
case *ApiNotFoundError:
Expand All @@ -85,7 +85,7 @@ func (a *ApiClient) FindFileSystemsByFilter(ctx context.Context, query *FileSyst
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
ret := &[]FileSystem{}
q, _ := qs.Values(query)
err := a.Get(ctx, query.GetBasePath(), q, ret)
err := a.Get(ctx, query.GetBasePath(a), q, ret)
if err != nil {
return err
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func (a *ApiClient) CreateFileSystem(ctx context.Context, r *FileSystemCreateReq
return err
}

err = a.Post(ctx, r.getRelatedObject().GetBasePath(), &payload, nil, fs)
err = a.Post(ctx, r.getRelatedObject().GetBasePath(a), &payload, nil, fs)
if err != nil {
return err
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func (a *ApiClient) UpdateFileSystem(ctx context.Context, r *FileSystemResizeReq
if err != nil {
return err
}
err = a.Put(ctx, r.getApiUrl(), &payload, nil, fs)
err = a.Put(ctx, r.getApiUrl(a), &payload, nil, fs)
if err != nil {
return err
}
Expand All @@ -198,7 +198,7 @@ func (a *ApiClient) DeleteFileSystem(ctx context.Context, r *FileSystemDeleteReq
return RequestMissingParams
}
apiResponse := &ApiResponse{}
err := a.Delete(ctx, r.getApiUrl(), nil, nil, apiResponse)
err := a.Delete(ctx, r.getApiUrl(a), nil, nil, apiResponse)
if err != nil {
switch t := err.(type) {
case *ApiNotFoundError:
Expand All @@ -222,7 +222,7 @@ func (a *ApiClient) GetFileSystemMountToken(ctx context.Context, r *FileSystemMo
if !r.hasRequiredFields() {
return RequestMissingParams
}
err := a.Get(ctx, r.getApiUrl(), nil, token)
err := a.Get(ctx, r.getApiUrl(a), nil, token)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Failed to obtain a mount token")
return err
Expand Down Expand Up @@ -260,12 +260,12 @@ func (fs *FileSystem) GetType() string {
return "filesystem"
}

func (fs *FileSystem) GetBasePath() string {
func (fs *FileSystem) GetBasePath(a *ApiClient) string {
return "fileSystems"
}

func (fs *FileSystem) GetApiUrl() string {
url, err := urlutil.URLJoin(fs.GetBasePath(), fs.Uid.String())
func (fs *FileSystem) GetApiUrl(a *ApiClient) string {
url, err := urlutil.URLJoin(fs.GetBasePath(a), fs.Uid.String())
if err != nil {
return ""
}
Expand Down Expand Up @@ -297,8 +297,8 @@ type FileSystemCreateRequest struct {
AllowNoKms bool `json:"allow_no_kms,omitempty"`
}

func (fsc *FileSystemCreateRequest) getApiUrl() string {
return fsc.getRelatedObject().GetBasePath()
func (fsc *FileSystemCreateRequest) getApiUrl(a *ApiClient) string {
return fsc.getRelatedObject().GetBasePath(a)
}

func (fsc *FileSystemCreateRequest) getRequiredFields() []string {
Expand Down Expand Up @@ -339,8 +339,8 @@ func NewFileSystemResizeRequest(fsUid uuid.UUID, totalCapacity *int64) *FileSyst
return ret
}

func (fsu *FileSystemResizeRequest) getApiUrl() string {
url, err := urlutil.URLJoin(fsu.getRelatedObject().GetBasePath(), fsu.Uid.String())
func (fsu *FileSystemResizeRequest) getApiUrl(a *ApiClient) string {
url, err := urlutil.URLJoin(fsu.getRelatedObject().GetBasePath(a), fsu.Uid.String())
if err != nil {
return ""
}
Expand Down Expand Up @@ -371,8 +371,8 @@ func (fsd *FileSystemDeleteRequest) String() string {
return fmt.Sprintln("FileSystemDeleteRequest(fsUid:", fsd.Uid, ")")
}

func (fsd *FileSystemDeleteRequest) getApiUrl() string {
url, err := urlutil.URLJoin(fsd.getRelatedObject().GetBasePath(), fsd.Uid.String())
func (fsd *FileSystemDeleteRequest) getApiUrl(a *ApiClient) string {
url, err := urlutil.URLJoin(fsd.getRelatedObject().GetBasePath(a), fsd.Uid.String())
if err != nil {
return ""
}
Expand All @@ -399,8 +399,8 @@ func (fsm *FileSystemMountTokenRequest) String() string {
return fmt.Sprintln("FilesystemMountTokenRequest(fsUid:", fsm.Uid, ")")
}

func (fsm *FileSystemMountTokenRequest) getApiUrl() string {
url, err := urlutil.URLJoin(fsm.getRelatedObject().GetBasePath(), fsm.Uid.String(), "mountToken")
func (fsm *FileSystemMountTokenRequest) getApiUrl(a *ApiClient) string {
url, err := urlutil.URLJoin(fsm.getRelatedObject().GetBasePath(a), fsm.Uid.String(), "mountToken")
if err != nil {
return ""
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/wekafs/apiclient/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,17 @@ func (n *WekaNode) GetType() string {
return "wekanode"
}

func (n *WekaNode) GetBasePath() string {
func (n *WekaNode) GetBasePath(a *ApiClient) string {
if a != nil {
if a.CompatibilityMap.NewNodeApiObjectPath {
return "processes"
}
}
return "nodes"
}

func (n *WekaNode) GetApiUrl() string {
url, err := urlutil.URLJoin(n.GetBasePath(), n.Uid.String())
func (n *WekaNode) GetApiUrl(a *ApiClient) string {
url, err := urlutil.URLJoin(n.GetBasePath(a), n.Uid.String())
if err != nil {
return url
}
Expand Down Expand Up @@ -79,7 +84,7 @@ func (n *WekaNode) isDrive() bool {
func (a *ApiClient) GetNodes(ctx context.Context, nodes *[]WekaNode) error {
node := &WekaNode{}

err := a.Get(ctx, node.GetBasePath(), nil, nodes)
err := a.Get(ctx, node.GetBasePath(a), nil, nodes)
if err != nil {
return err
}
Expand All @@ -104,7 +109,7 @@ func (a *ApiClient) GetNodeByUid(ctx context.Context, uid uuid.UUID, node *WekaN
n := &WekaNode{
Uid: uid,
}
err := a.Get(ctx, n.GetApiUrl(), nil, node)
err := a.Get(ctx, n.GetApiUrl(a), nil, node)
if err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/wekafs/apiclient/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ func (q *Quota) GetType() string {
return "quota"
}

func (q *Quota) GetBasePath() string {
fsUrl := (&FileSystem{Uid: q.FilesystemUid}).GetApiUrl()
func (q *Quota) GetBasePath(a *ApiClient) string {
fsUrl := (&FileSystem{Uid: q.FilesystemUid}).GetApiUrl(a)
url, err := urlutil.URLJoin(fsUrl, q.GetType())
if err != nil {
return ""
}
return url
}

func (q *Quota) GetApiUrl() string {
url, err := urlutil.URLJoin(q.GetBasePath(), strconv.FormatUint(q.InodeId, 10))
func (q *Quota) GetApiUrl(a *ApiClient) string {
url, err := urlutil.URLJoin(q.GetBasePath(a), strconv.FormatUint(q.InodeId, 10))
if err != nil {
return ""
}
Expand Down Expand Up @@ -96,8 +96,8 @@ type QuotaCreateRequest struct {
capacityLimit uint64
}

func (qc *QuotaCreateRequest) getApiUrl() string {
return qc.getRelatedObject().GetApiUrl()
func (qc *QuotaCreateRequest) getApiUrl(a *ApiClient) string {
return qc.getRelatedObject().GetApiUrl(a)
}

func (qc *QuotaCreateRequest) getRequiredFields() []string {
Expand Down Expand Up @@ -126,8 +126,8 @@ type QuotaUpdateRequest struct {
capacityLimit uint64
}

func (qu *QuotaUpdateRequest) getApiUrl() string {
return qu.getRelatedObject().GetApiUrl()
func (qu *QuotaUpdateRequest) getApiUrl(a *ApiClient) string {
return qu.getRelatedObject().GetApiUrl(a)
}

func (qu *QuotaUpdateRequest) getRequiredFields() []string {
Expand Down Expand Up @@ -200,8 +200,8 @@ func (qd *QuotaDeleteRequest) String() string {
return fmt.Sprintln("QuotaDeleteRequest(fsUid:", qd.filesystemUid, "inodeId:", qd.InodeId, ")")
}

func (qd *QuotaDeleteRequest) getApiUrl() string {
url, err := urlutil.URLJoin((&FileSystem{Uid: qd.filesystemUid}).GetApiUrl(), "quotas", strconv.FormatUint(qd.InodeId, 10))
func (qd *QuotaDeleteRequest) getApiUrl(a *ApiClient) string {
url, err := urlutil.URLJoin((&FileSystem{Uid: qd.filesystemUid}).GetApiUrl(a), "quotas", strconv.FormatUint(qd.InodeId, 10))
if err != nil {
return ""
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func (a *ApiClient) CreateQuota(ctx context.Context, qr *QuotaCreateRequest, q *
return err
}

err = a.Put(ctx, qr.getApiUrl(), &payload, nil, q)
err = a.Put(ctx, qr.getApiUrl(a), &payload, nil, q)
if err != nil {
return err
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func (a *ApiClient) FindQuotaByFilter(ctx context.Context, query *Quota, resultS
return RequestMissingParams
}
ret := &[]Quota{}
err := a.Get(ctx, query.GetBasePath(), nil, ret)
err := a.Get(ctx, query.GetBasePath(a), nil, ret)
if err != nil {
return err
}
Expand All @@ -295,7 +295,7 @@ func (a *ApiClient) GetQuotaByFileSystemAndInode(ctx context.Context, fs *FileSy
FilesystemUid: fs.Uid,
InodeId: inodeId,
}
err := a.Get(ctx, ret.GetApiUrl(), nil, ret)
err := a.Get(ctx, ret.GetApiUrl(a), nil, ret)
if err != nil {
switch t := err.(type) {
case ApiNotFoundError:
Expand Down Expand Up @@ -359,7 +359,7 @@ func (a *ApiClient) UpdateQuota(ctx context.Context, r *QuotaUpdateRequest, q *Q
if err != nil {
return err
}
err = a.Put(ctx, r.getApiUrl(), &payload, nil, q)
err = a.Put(ctx, r.getApiUrl(a), &payload, nil, q)
if err != nil {
return err
}
Expand All @@ -375,7 +375,7 @@ func (a *ApiClient) DeleteQuota(ctx context.Context, r *QuotaDeleteRequest) erro
return RequestMissingParams
}
apiResponse := &ApiResponse{}
err := a.Delete(ctx, r.getApiUrl(), nil, nil, apiResponse)
err := a.Delete(ctx, r.getApiUrl(a), nil, nil, apiResponse)
if err != nil {
return err
}
Expand Down
Loading
Loading