Skip to content

Commit

Permalink
Cached into shared staging dir
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed Jul 10, 2024
1 parent b12b353 commit 33955f1
Showing 1 changed file with 8 additions and 26 deletions.
34 changes: 8 additions & 26 deletions pkg/api/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package api

import (
"context"
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -41,9 +39,7 @@ type Cached struct {
Client *client.Client
StagingPath string

// the current directory holding a fully formed downloaded cache
currentDir string
// the current version of the cache on disk at currentDir
// the current version of the cache on disk
currentVersion int64
}

Expand All @@ -67,23 +63,18 @@ func (c *Cached) PopulateDiskCache(ctx context.Context, req *pb.PopulateDiskCach
return &pb.PopulateDiskCacheResponse{Version: version}, nil
}

// Fetch the cache into a spot in the staging dir
// Fetch the cache into the staging dir
func (c *Cached) Prepare(ctx context.Context) error {
start := time.Now()
folderName, err := randomString()
if err != nil {
return err
}
newDir := path.Join(c.StagingPath, folderName)
version, count, err := c.Client.GetCache(ctx, newDir)

version, count, err := c.Client.GetCache(ctx, c.StagingPath)
if err != nil {
return err
}

c.currentDir = newDir
c.currentVersion = version

logger.Info(ctx, "downloaded golden copy", key.Directory.Field(newDir), key.DurationMS.Field(time.Since(start)), key.Version.Field(version), key.Count.Field(int64(count)))
logger.Info(ctx, "downloaded golden copy", key.DurationMS.Field(time.Since(start)), key.Version.Field(version), key.Count.Field(int64(count)))
return nil
}

Expand All @@ -109,7 +100,7 @@ func (c *Cached) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCa
// Probe returns the health and readiness of the plugin
func (c *Cached) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
ready := true
if c.currentDir == "" {
if c.currentVersion == 0 {
ready = false
logger.Warn(ctx, "csi probe failed as daemon hasn't prepared cache yet", key.Version.Field(c.currentVersion))
}
Expand Down Expand Up @@ -255,7 +246,7 @@ func (c *Cached) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeS
// check if the destination exists, and if so, if its writable
// hardlink the golden copy into this downstream's destination, creating it if need be
func (c *Cached) writeCache(destination string) (int64, error) {
if c.currentDir == "" {
if c.currentVersion == 0 {
return -1, errors.New("no cache prepared, currentDir is nil")
}

Expand All @@ -274,7 +265,7 @@ func (c *Cached) writeCache(destination string) (int64, error) {
}
}

err = files.HardlinkDir(c.currentDir, destination)
err = files.HardlinkDir(c.StagingPath, destination)
if err != nil {
return -1, fmt.Errorf("failed to hardlink cache to destination %s: %v", destination, err)
}
Expand All @@ -301,12 +292,3 @@ func getFolderSize(path string) (int64, error) {
})
return totalSize, err
}

func randomString() (string, error) {
// Generate a secure random string for the temporary directory name
randBytes := make([]byte, 10) // Adjust the size of the byte slice as needed
if _, err := rand.Read(randBytes); err != nil {
return "", err
}
return base64.URLEncoding.EncodeToString(randBytes), nil
}

0 comments on commit 33955f1

Please sign in to comment.