Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#1280 from haoming29/fix-cache-self…
Browse files Browse the repository at this point in the history
…-test-failure

Fix cache self-test failure
  • Loading branch information
turetske authored May 15, 2024
2 parents ee3ea0c + de19136 commit 694c735
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 71 deletions.
48 changes: 2 additions & 46 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@ package cache

import (
"context"
"errors"
"os"
"path/filepath"
"time"

"github.com/gin-gonic/gin"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_utils"
"golang.org/x/sync/errgroup"

"github.com/pelicanplatform/pelican/server_utils"
)

var (
Expand All @@ -44,43 +40,3 @@ func RegisterCacheAPI(router *gin.Engine, ctx context.Context, egrp *errgroup.Gr
group.POST("/directorTest", func(ginCtx *gin.Context) { server_utils.HandleDirectorTestResponse(ginCtx, notificationChan) })
}
}

// Periodically scan the /<runLocation>/pelican/monitoring directory to clean up test files
func LaunchDirectorTestFileCleanup(ctx context.Context) {
server_utils.LaunchWatcherMaintenance(ctx,
[]string{filepath.Join(param.Cache_DataLocation.GetString(), "pelican", "monitoring")},
"cache director-based health test clean up",
time.Minute,
func(notifyEvent bool) error {
// We run this function regardless of notifyEvent to do the cleanup
dirPath := filepath.Join(param.Cache_DataLocation.GetString(), "pelican", "monitoring")
dirInfo, err := os.Stat(dirPath)
if err != nil {
return err
} else {
if !dirInfo.IsDir() {
return errors.New("monitoring path is not a directory: " + dirPath)
}
}
dirItems, err := os.ReadDir(dirPath)
if err != nil {
return err
}
if len(dirItems) <= 2 { // At mininum there are the test file and .cinfo file, and we don't want to remove the last two
return nil
}
for idx, item := range dirItems {
// For all but the latest two files (test file and its .cinfo file)
// os.ReadDir sorts dirEntries in order of file names. Since our test file names are timestamped and is string comparable,
// the last two files should be the latest test files, which we want to keep
if idx < len(dirItems)-2 {
err := os.Remove(filepath.Join(dirPath, item.Name()))
if err != nil {
return err
}
}
}
return nil
},
)
}
63 changes: 61 additions & 2 deletions cache/cache_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@
package cache

import (
"context"
"io/fs"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/pelicanplatform/pelican/param"
"github.com/pkg/errors"

"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_utils"
)

// Check for the sentinel file
func CheckCacheSentinelLocation() error {
if param.Cache_SentinelLocation.IsSet() {
sentinelPath := param.Cache_SentinelLocation.GetString()
dataLoc := param.Cache_DataLocation.GetString()
dataLoc := param.Cache_LocalRoot.GetString()
sentinelPath = path.Clean(sentinelPath)
if path.Base(sentinelPath) != sentinelPath {
return errors.Errorf("invalid Cache.SentinelLocation path. File must not contain a directory. Got %s", sentinelPath)
Expand All @@ -44,3 +50,56 @@ func CheckCacheSentinelLocation() error {
}
return nil
}

// Periodically scan the /<Cache.LocalRoot>/pelican/monitoring directory to clean up test files
// TODO: Director test files should be under /pelican/monitoring/directorTest and the file names
// should have director-test- as the prefix
func LaunchDirectorTestFileCleanup(ctx context.Context) {
server_utils.LaunchWatcherMaintenance(ctx,
[]string{filepath.Join(param.Cache_LocalRoot.GetString(), "pelican", "monitoring")},
"cache director-based health test clean up",
time.Minute,
func(notifyEvent bool) error {
// We run this function regardless of notifyEvent to do the cleanup
dirPath := filepath.Join(param.Cache_LocalRoot.GetString(), "pelican", "monitoring")
dirInfo, err := os.Stat(dirPath)
if err != nil {
return err
} else {
if !dirInfo.IsDir() {
return errors.New("monitoring path is not a directory: " + dirPath)
}
}
dirItems, err := os.ReadDir(dirPath)
if err != nil {
return err
}
directorItems := []fs.DirEntry{}
for _, item := range dirItems {
if item.IsDir() {
continue
}
// Ignore self tests. They should be handled automatically by self test logic
if strings.HasPrefix(item.Name(), selfTestPrefix) {
continue
}
directorItems = append(directorItems, item)
}
if len(directorItems) <= 2 { // At mininum there are the test file and .cinfo file, and we don't want to remove the last two
return nil
}
for idx, item := range directorItems {
// For all but the latest two files (test file and its .cinfo file)
// os.ReadDir sorts dirEntries in order of file names. Since our test file names are timestamped and is string comparable,
// the last two files should be the latest test files, which we want to keep
if idx < len(dirItems)-2 {
err := os.Remove(filepath.Join(dirPath, item.Name()))
if err != nil {
return err
}
}
}
return nil
},
)
}
4 changes: 2 additions & 2 deletions cache/cache_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestCheckCacheSentinelLocation(t *testing.T) {
tmpDir := t.TempDir()
viper.Reset()
viper.Set(param.Cache_SentinelLocation.GetName(), "test.txt")
viper.Set(param.Cache_DataLocation.GetName(), tmpDir)
viper.Set(param.Cache_LocalRoot.GetName(), tmpDir)
err := CheckCacheSentinelLocation()
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to open Cache.SentinelLocation")
Expand All @@ -59,7 +59,7 @@ func TestCheckCacheSentinelLocation(t *testing.T) {
viper.Reset()

viper.Set(param.Cache_SentinelLocation.GetName(), "test.txt")
viper.Set(param.Cache_DataLocation.GetName(), tmpDir)
viper.Set(param.Cache_LocalRoot.GetName(), tmpDir)

file, err := os.Create(filepath.Join(tmpDir, "test.txt"))
require.NoError(t, err)
Expand Down
41 changes: 25 additions & 16 deletions cache/self_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ import (
)

const (
selfTestBody string = "This object was created by the Pelican self-test functionality"
selfTestDir string = "/pelican/monitoring"
selfTestBody string = "This object was created by the Pelican self-test functionality"
selfTestDir string = "/pelican/monitoring/selfTest"
selfTestPrefix string = "self-test-"
)

// Add self-test directories to xrootd data location of the cache
Expand All @@ -56,10 +57,11 @@ func InitSelfTestDir() error {
return err
}

basePath := param.Cache_DataLocation.GetString()
basePath := param.Cache_LocalRoot.GetString()
pelicanMonPath := filepath.Join(basePath, "/pelican")
monitoringPath := filepath.Join(pelicanMonPath, "/monitoring")
err = os.MkdirAll(monitoringPath, 0700)
selfTestPath := filepath.Join(monitoringPath, "/selfTest")
err = os.MkdirAll(selfTestPath, 0700)
if err != nil {
return errors.Wrap(err, "Fail to create directory for the self-test")
}
Expand All @@ -69,15 +71,21 @@ func InitSelfTestDir() error {
if err = os.Chown(monitoringPath, uid, gid); err != nil {
return errors.Wrapf(err, "Unable to change ownership of self-test /pelican/monitoring directory %v to desired daemon gid %v", monitoringPath, gid)
}
if err = os.Chown(selfTestPath, uid, gid); err != nil {
return errors.Wrapf(err, "Unable to change ownership of self-test /pelican/monitoring directory %v to desired daemon gid %v", monitoringPath, gid)
}
return nil
}

func generateTestFile() (string, error) {
basePath := param.Cache_DataLocation.GetString()
monitoringPath := filepath.Join(basePath, selfTestDir)
_, err := os.Stat(monitoringPath)
basePath := param.Cache_LocalRoot.GetString()
if basePath == "" {
return "", errors.New("failed to generate self-test file for cache: Cache.LocalRoot is not set.")
}
selfTestPath := filepath.Join(basePath, selfTestDir)
_, err := os.Stat(selfTestPath)
if err != nil {
return "", errors.Wrap(err, "self-test directory does not exist at "+monitoringPath)
return "", errors.Wrap(err, "self-test directory does not exist at "+selfTestPath)
}
uid, err := config.GetDaemonUID()
if err != nil {
Expand All @@ -104,13 +112,13 @@ func generateTestFile() (string, error) {
return "", err
}

testFileName := "self-test-" + now.Format(time.RFC3339) + ".txt"
testFileCinfoName := "self-test-" + now.Format(time.RFC3339) + ".txt.cinfo"
testFileName := selfTestPrefix + now.Format(time.RFC3339) + ".txt"
testFileCinfoName := selfTestPrefix + now.Format(time.RFC3339) + ".txt.cinfo"

finalFilePath := filepath.Join(monitoringPath, testFileName)
finalFilePath := filepath.Join(selfTestPath, testFileName)

tmpFileCinfoPath := filepath.Join(monitoringPath, testFileCinfoName+".tmp")
finalFileCinfoPath := filepath.Join(monitoringPath, testFileCinfoName)
tmpFileCinfoPath := filepath.Join(selfTestPath, testFileCinfoName+".tmp")
finalFileCinfoPath := filepath.Join(selfTestPath, testFileCinfoName)

// This is for web URL path, do not use filepath
extFilePath := path.Join(selfTestDir, testFileName)
Expand All @@ -120,6 +128,7 @@ func generateTestFile() (string, error) {
return "", errors.Wrapf(err, "failed to create self-test file %s", finalFilePath)
}
defer file.Close()
defer log.Debug("Cache self-test file created at: ", finalFilePath)

cinfoFile, err := os.OpenFile(tmpFileCinfoPath, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
Expand Down Expand Up @@ -165,7 +174,7 @@ func generateFileTestScitoken() (string, error) {
fTestTokenCfg.Lifetime = time.Minute
fTestTokenCfg.Issuer = issuerUrl
fTestTokenCfg.Subject = "cache"
fTestTokenCfg.Claims = map[string]string{"scope": "storage.read:/pelican/monitoring"}
fTestTokenCfg.Claims = map[string]string{"scope": "storage.read:/pelican/monitoring/selfTest"}
// For self-tests, the audience is the server itself
fTestTokenCfg.AddAudienceAny()

Expand Down Expand Up @@ -207,14 +216,14 @@ func downloadTestFile(ctx context.Context, fileUrl string) error {
return errors.Wrap(err, "failed to get response body from cache self-test download")
}
if string(body) != selfTestBody {
return errors.Errorf("contents of cache self-test file do not match the one uploaded: %v", body)
return errors.Errorf("contents of cache self-test file do not match the one uploaded. Expected: %s \nGot: %s", selfTestBody, string(body))
}

return nil
}

func deleteTestFile(fileUrlStr string) error {
basePath := param.Cache_DataLocation.GetString()
basePath := param.Cache_LocalRoot.GetString()
fileUrl, err := url.Parse(fileUrlStr)
if err != nil {
return errors.Wrap(err, "invalid file url to remove the test file")
Expand Down
2 changes: 1 addition & 1 deletion client/fed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func TestDirectReads(t *testing.T) {
assert.Equal(t, transferResults[0].TransferredBytes, int64(17))

// Assert that the file was not cached
cacheDataLocation := param.Cache_DataLocation.GetString() + export.FederationPrefix
cacheDataLocation := param.Cache_LocalRoot.GetString() + export.FederationPrefix
filepath := filepath.Join(cacheDataLocation, filepath.Base(tempFile.Name()))
_, err = os.Stat(filepath)
assert.True(t, os.IsNotExist(err))
Expand Down
2 changes: 1 addition & 1 deletion cmd/fed_serve_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestFedServeCache(t *testing.T) {
viper.Set("ConfigDir", tmpPath)
viper.Set("Origin.RunLocation", filepath.Join(tmpPath, "xOrigin"))
viper.Set("Cache.RunLocation", filepath.Join(tmpPath, "xCache"))
viper.Set("Cache.DataLocation", filepath.Join(tmpPath, "data"))
viper.Set("Cache.LocalRoot", filepath.Join(tmpPath, "data"))
viper.Set("Origin.StoragePrefix", filepath.Join(origPath, "ns"))
viper.Set("Origin.FederationPrefix", "/test")
testFilePath := filepath.Join(origPath, "ns", "test-file.txt")
Expand Down
19 changes: 17 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,21 +1077,32 @@ func InitServer(ctx context.Context, currentServers ServerType) error {
}
}

if param.Cache_DataLocation.IsSet() {
log.Warningf("Deprecated configuration key %s is set. Please migrate to use %s instead", param.Cache_DataLocation.GetName(), param.Cache_LocalRoot.GetName())
log.Warningf("Will attempt to use the value of %s as default for %s", param.Cache_DataLocation.GetName(), param.Cache_LocalRoot.GetName())
}

if IsRootExecution() {
if currentServers.IsEnabled(OriginType) {
viper.SetDefault("Origin.RunLocation", filepath.Join("/run", "pelican", "xrootd", "origin"))
}
if currentServers.IsEnabled(CacheType) {
viper.SetDefault("Cache.RunLocation", filepath.Join("/run", "pelican", "xrootd", "cache"))
}
viper.SetDefault("Cache.LocalRoot", "/run/pelican/cache")

// To ensure Cache.DataLocation still works, we default Cache.LocalRoot to Cache.DataLocation
// The logic is extracted from handleDeprecatedConfig as we manually set the default value here
viper.SetDefault(param.Cache_DataLocation.GetName(), "/run/pelican/cache")
viper.SetDefault(param.Cache_LocalRoot.GetName(), param.Cache_DataLocation.GetString())

if viper.IsSet("Cache.DataLocation") {
viper.SetDefault("Cache.DataLocations", []string{filepath.Join(param.Cache_DataLocation.GetString(), "data")})
viper.SetDefault("Cache.MetaLocations", []string{filepath.Join(param.Cache_DataLocation.GetString(), "meta")})
} else {
viper.SetDefault("Cache.DataLocations", []string{"/run/pelican/cache/data"})
viper.SetDefault("Cache.MetaLocations", []string{"/run/pelican/cache/meta"})
}

viper.SetDefault("LocalCache.RunLocation", filepath.Join("/run", "pelican", "localcache"))

viper.SetDefault("Origin.Multiuser", true)
Expand Down Expand Up @@ -1135,7 +1146,11 @@ func InitServer(ctx context.Context, currentServers ServerType) error {
}
cleanupDirOnShutdown(ctx, runtimeDir)
}
viper.SetDefault("Cache.LocalRoot", filepath.Join(runtimeDir, "cache"))
// To ensure Cache.DataLocation still works, we default Cache.LocalRoot to Cache.DataLocation
// The logic is extracted from handleDeprecatedConfig as we manually set the default value here
viper.SetDefault(param.Cache_DataLocation.GetName(), filepath.Join(runtimeDir, "cache"))
viper.SetDefault(param.Cache_LocalRoot.GetName(), param.Cache_DataLocation.GetString())

if viper.IsSet("Cache.DataLocation") {
viper.SetDefault("Cache.DataLocations", []string{filepath.Join(param.Cache_DataLocation.GetString(), "data")})
viper.SetDefault("Cache.MetaLocations", []string{filepath.Join(param.Cache_DataLocation.GetString(), "meta")})
Expand Down
2 changes: 1 addition & 1 deletion fed_test_utils/fed.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func NewFedTest(t *testing.T, originConfig string) (ft *FedTest) {
viper.Set("Server.WebPort", 0)
viper.Set("Origin.RunLocation", filepath.Join(tmpPath, "origin"))
viper.Set("Cache.RunLocation", filepath.Join(tmpPath, "cache"))
viper.Set("Cache.DataLocation", filepath.Join(tmpPath, "xcache-data"))
viper.Set("Cache.LocalRoot", filepath.Join(tmpPath, "xcache-data"))
viper.Set("LocalCache.RunLocation", filepath.Join(tmpPath, "local-cache"))
viper.Set("Registry.RequireOriginApproval", false)
viper.Set("Registry.RequireCacheApproval", false)
Expand Down

0 comments on commit 694c735

Please sign in to comment.