Skip to content

Commit

Permalink
feat(CSI-252): implement kubelet PVC stats
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyberezansky committed Sep 11, 2024
1 parent 1e4001d commit 39100a0
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 3 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/go-version v1.7.0
github.com/kubernetes-csi/csi-lib-utils v0.19.0
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.10
github.com/prometheus/client_golang v1.20.2
github.com/rs/zerolog v1.33.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ github.com/opencontainers/runc v1.1.13 h1:98S2srgG9vw0zWcDpFMn5TRrh8kLxa/5OFUstu
github.com/opencontainers/runc v1.1.13/go.mod h1:R016aXacfp/gwQBYw2FDGa9m+n6atbLWrYY8hNMT/sA=
github.com/opencontainers/runtime-spec v1.2.0 h1:z97+pHb3uELt/yiAWD691HNHQIF07bE7dzrbT927iTk=
github.com/opencontainers/runtime-spec v1.2.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/xattr v0.4.10 h1:Qe0mtiNFHQZ296vRgUjRCoPHPqH7VdTOrZx3g0T+pGA=
github.com/pkg/xattr v0.4.10/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU=
Expand Down
79 changes: 76 additions & 3 deletions pkg/wekafs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
Expand All @@ -32,6 +33,7 @@ import (
"path/filepath"
"strings"
"sync"
"syscall"
"time"
)

Expand Down Expand Up @@ -84,16 +86,87 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, request *csi.NodeExp
panic("implement me")
}

//goland:noinspection GoUnusedParameter
func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, request *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
panic("implement me")
func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
volumeID := req.GetVolumeId()
volumePath := req.GetVolumePath()

// Validate request fields
if volumeID == "" || volumePath == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID and path must be provided")
}

// Check if the volume path exists
if ns.getConfig().isInDevMode() {
// In dev mode, we don't have the actual Weka mount, so we just check if the path exists
if _, err := os.Stat(volumePath); err != nil {
return nil, status.Error(codes.NotFound, "Volume path not found")
}

} else {
// In production mode, we check if the path is indeed a Weka mount (Either NFS or WekaFS)
if !PathIsWekaMount(ctx, volumePath) {
return nil, status.Error(codes.NotFound, "Volume path not found")
}
}

// Validate Weka volume ID
if err := validateVolumeId(volumeID); err != nil {
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "invalid volume ID").Error())
}

capacityBytes, usedBytes, availableBytes, err := getVolumeStats(volumePath)
if err != nil {
return &csi.NodeGetVolumeStatsResponse{
Usage: nil,
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: "Failed to fetch volume stats for volume",
},
}, status.Errorf(codes.Internal, "Failed to get stats for volume %s: %v", volumeID, err)
}
// Prepare response
return &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Unit: csi.VolumeUsage_BYTES,
Total: capacityBytes,
Used: usedBytes,
Available: availableBytes,
},
},
VolumeCondition: &csi.VolumeCondition{
Abnormal: false,
Message: "volume is healthy",
},
}, nil
}

// getVolumeStats fetches filesystem statistics for the mounted volume path.
func getVolumeStats(volumePath string) (capacityBytes, usedBytes, availableBytes int64, err error) {
var stat syscall.Statfs_t

// Use Statfs to get filesystem statistics for the volume path
err = syscall.Statfs(volumePath, &stat)
if err != nil {
return 0, 0, 0, err
}

// Calculate capacity, available, and used space in bytes
capacityBytes = int64(stat.Blocks) * int64(stat.Bsize)
availableBytes = int64(stat.Bavail) * int64(stat.Bsize)
usedBytes = capacityBytes - availableBytes

return capacityBytes, usedBytes, availableBytes, nil
}

func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounter AnyMounter, config *DriverConfig) *NodeServer {
//goland:noinspection GoBoolExpressions
return &NodeServer{
caps: getNodeServiceCapabilities(
[]csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
//csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
},
),
Expand Down

0 comments on commit 39100a0

Please sign in to comment.