Skip to content

Commit

Permalink
Merge pull request #978 from haoming29/authed-self-test-cache
Browse files Browse the repository at this point in the history
Authenticated self-tests for cache
  • Loading branch information
turetske authored Apr 1, 2024
2 parents 2105b05 + 596ddfa commit 5dd6572
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 73 deletions.
70 changes: 50 additions & 20 deletions cache/self_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/metrics"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/token"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -76,7 +77,7 @@ func generateTestFile() (string, error) {
monitoringPath := filepath.Join(basePath, selfTestDir)
_, err := os.Stat(monitoringPath)
if err != nil {
return "", errors.Wrap(err, "Self-test directory does not exist")
return "", errors.Wrap(err, "self-test directory does not exist at "+monitoringPath)
}
uid, err := config.GetDaemonUID()
if err != nil {
Expand Down Expand Up @@ -116,71 +117,97 @@ func generateTestFile() (string, error) {

file, err := os.OpenFile(finalFilePath, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return "", errors.Wrapf(err, "Failed to create self-test file %s", finalFilePath)
return "", errors.Wrapf(err, "failed to create self-test file %s", finalFilePath)
}
defer file.Close()

cinfoFile, err := os.OpenFile(tmpFileCinfoPath, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return "", errors.Wrapf(err, "Failed to create self-test cinfo file %s", tmpFileCinfoPath)
return "", errors.Wrapf(err, "failed to create self-test cinfo file %s", tmpFileCinfoPath)
}
defer cinfoFile.Close()

if _, err := file.Write(testFileBytes); err != nil {
return "", errors.Wrapf(err, "Failed to write test content to self-test file %s", finalFilePath)
return "", errors.Wrapf(err, "failed to write test content to self-test file %s", finalFilePath)
}
if _, err := cinfoFile.Write(cinfoBytes); err != nil {
return "", errors.Wrapf(err, "Failed to write cinfo content to self-test cinfo file %s", tmpFileCinfoPath)
return "", errors.Wrapf(err, "failed to write cinfo content to self-test cinfo file %s", tmpFileCinfoPath)
}

if err = file.Chown(uid, gid); err != nil {
return "", errors.Wrapf(err, "Unable to change ownership of self-test file %v to desired daemon gid %v", file, gid)
return "", errors.Wrapf(err, "unable to change ownership of self-test file %v to desired daemon gid %v", file, gid)
}
if err = cinfoFile.Chown(uid, gid); err != nil {
return "", errors.Wrapf(err, "Unable to change ownership of self-test cinfo file %v to desired daemon gid %v", file, gid)
return "", errors.Wrapf(err, "unable to change ownership of self-test cinfo file %v to desired daemon gid %v", file, gid)
}

if err := os.Rename(tmpFileCinfoPath, finalFileCinfoPath); err != nil {
return "", errors.Wrapf(err, "Unable to move self-test cinfo file from temp location %q to desired location %q", tmpFileCinfoPath, finalFileCinfoPath)
return "", errors.Wrapf(err, "unable to move self-test cinfo file from temp location %q to desired location %q", tmpFileCinfoPath, finalFileCinfoPath)
}

cachePort := param.Cache_Port.GetInt()
baseUrlStr := fmt.Sprintf("https://%s:%d", param.Server_Hostname.GetString(), cachePort)
baseUrl, err := url.Parse(baseUrlStr)
if err != nil {
return "", errors.Wrap(err, "Failed to validate the base url for self-test download")
return "", errors.Wrap(err, "failed to validate the base url for self-test download")
}
baseUrl.Path = extFilePath

return baseUrl.String(), nil
}

func generateFileTestScitoken() (string, error) {
issuerUrl := param.Server_ExternalWebUrl.GetString()
if issuerUrl == "" { // if both are empty, then error
return "", errors.New("failed to create token: invalid iss, Server_ExternalWebUrl is empty")
}
fTestTokenCfg := token.NewWLCGToken()
fTestTokenCfg.Lifetime = time.Minute
fTestTokenCfg.Issuer = issuerUrl
fTestTokenCfg.Subject = "cache"
fTestTokenCfg.Claims = map[string]string{"scope": "storage.read:/pelican/monitoring"}
// For self-tests, the audience is the server itself
fTestTokenCfg.AddAudienceAny()

// CreateToken also handles validation for us
tok, err := fTestTokenCfg.CreateToken()
if err != nil {
return "", errors.Wrap(err, "failed to create file test token")
}

return tok, nil
}

func downloadTestFile(ctx context.Context, fileUrl string) error {
tkn, err := generateFileTestScitoken()
if err != nil {
return errors.Wrap(err, "failed to create a token for cache self-test download")
}

req, err := http.NewRequestWithContext(ctx, "GET", fileUrl, nil)
if err != nil {
return errors.Wrap(err, "Failed to create GET request for test file transfer download")
return errors.Wrap(err, "failed to create GET request for cache self-test download")
}

// TODO: add auth for self-test and remove the public export in Authfile
// once we have a local issuer up for cache (should be doable once #816 is merged)
req.Header.Set("Authorization", "Bearer "+tkn)

client := http.Client{Transport: config.GetTransport()}

resp, err := client.Do(req)
if err != nil {
return errors.Wrap(err, "Failed to start request for test file transfer download")
return errors.Wrap(err, "failed to start request for cache self-test download")
}
defer resp.Body.Close()

if resp.StatusCode > 299 {
return errors.Errorf("Error response %v from test file transfer download: %v", resp.StatusCode, resp.Status)
return errors.Errorf("error response %v from cache self-test download: %v", resp.StatusCode, resp.Status)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return errors.Wrap(err, "Failed to get response body from test file transfer download")
return errors.Wrap(err, "failed to get response body from cache self-test download")
}
if string(body) != selfTestBody {
return errors.Errorf("Contents of test file transfer body do not match upload: %v", body)
return errors.Errorf("contents of cache self-test file do not match the one uploaded: %v", body)
}

return nil
Expand Down Expand Up @@ -216,11 +243,11 @@ func runSelfTest(ctx context.Context) (bool, error) {
}
err = downloadTestFile(ctx, fileUrl)
if err != nil {
err = deleteTestFile(fileUrl)
if err != nil {
return false, errors.Wrap(err, "self-test failed during delete")
errDel := deleteTestFile(fileUrl)
if errDel != nil {
return false, errors.Wrap(errDel, "self-test failed during delete")
}
return false, errors.Wrap(err, "self-test failed during download")
return false, errors.Wrap(err, "self-test failed during download. File is cleaned up at "+fileUrl)
}
err = deleteTestFile(fileUrl)
if err != nil {
Expand All @@ -237,6 +264,9 @@ func doSelfMonitor(ctx context.Context) {
if ok && err == nil {
log.Debugln("Self-test monitoring cycle succeeded at", time.Now().Format(time.UnixDate))
metrics.SetComponentHealthStatus(metrics.OriginCache_XRootD, metrics.StatusOK, "Self-test monitoring cycle succeeded at "+time.Now().Format(time.RFC3339))
} else if !ok && err == nil {
log.Warningln("Self-test monitoring cycle failed with unknown error")
metrics.SetComponentHealthStatus(metrics.OriginCache_XRootD, metrics.StatusCritical, "Self-test monitoring cycle failed with unknown err")
} else {
log.Warningln("Self-test monitoring cycle failed: ", err)
metrics.SetComponentHealthStatus(metrics.OriginCache_XRootD, metrics.StatusCritical, "Self-test monitoring cycle failed: "+err.Error())
Expand Down
10 changes: 8 additions & 2 deletions launchers/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,18 @@ import (
"github.com/gin-gonic/gin"
"github.com/pelicanplatform/pelican/broker"
"github.com/pelicanplatform/pelican/cache"
"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/daemon"
"github.com/pelicanplatform/pelican/launcher_utils"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/xrootd"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group) (server_structs.XRootDServer, error) {

func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, modules config.ServerType) (server_structs.XRootDServer, error) {
err := xrootd.SetUpMonitoring(ctx, egrp)
if err != nil {
return nil, err
Expand Down Expand Up @@ -77,6 +78,11 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group) (
cache.PeriodicCacheSelfTest(ctx, egrp)
}

// Director and origin also registers this metadata URL; avoid registering twice.
if !modules.IsEnabled(config.DirectorType) && !modules.IsEnabled(config.OriginType) {
server_utils.RegisterOIDCAPI(engine)
}

log.Info("Launching cache")
launchers, err := xrootd.ConfigureLaunchers(false, configPath, false, true)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions launchers/cache_serve_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import (
"context"

"github.com/gin-gonic/gin"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/server_structs"
)

func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group) (server_structs.XRootDServer, error) {

func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, modules config.ServerType) (server_structs.XRootDServer, error) {
return nil, errors.New("Cache module is not supported on Windows")
}

Expand Down
2 changes: 1 addition & 1 deletion launchers/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func LaunchModules(ctx context.Context, modules config.ServerType) (context.Canc
log.Errorln("Director does not seem to be working:", err)
return shutdownCancel, err
}
server, err := CacheServe(ctx, engine, egrp)
server, err := CacheServe(ctx, engine, egrp, modules)
if err != nil {
return shutdownCancel, err
}
Expand Down
4 changes: 1 addition & 3 deletions launchers/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ func OriginServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group,

// Director also registers this metadata URL; avoid registering twice.
if !modules.IsEnabled(config.DirectorType) {
if err = origin.RegisterOriginOIDCAPI(engine.Group("/.well-known")); err != nil {
return nil, err
}
server_utils.RegisterOIDCAPI(engine)
}

if param.Origin_EnableIssuer.GetBool() {
Expand Down
33 changes: 0 additions & 33 deletions origin/origin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,52 +20,19 @@ package origin

import (
"context"
"encoding/json"
"net/http"
"net/url"

"github.com/gin-gonic/gin"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/metrics"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_utils"
)

var (
notificationChan = make(chan bool)
)

func exportOpenIDConfig(c *gin.Context) {
issuerURL, _ := url.Parse(param.Server_ExternalWebUrl.GetString())
jwksUri, _ := url.JoinPath(issuerURL.String(), "/.well-known/issuer.jwks")
jsonData := gin.H{
"issuer": issuerURL.String(),
"jwks_uri": jwksUri,
}

c.JSON(http.StatusOK, jsonData)
}

func exportIssuerJWKS(c *gin.Context) {
keys, _ := config.GetIssuerPublicJWKS()
buf, _ := json.MarshalIndent(keys, "", " ")

c.Data(http.StatusOK, "application/json; charset=utf-8", buf)
}

func RegisterOriginOIDCAPI(router *gin.RouterGroup) error {
if router == nil {
return errors.New("Origin configuration passed a nil pointer")
}

router.GET("/openid-configuration", exportOpenIDConfig)
router.GET("/issuer.jwks", exportIssuerJWKS)
return nil
}

// Configure API endpoints for origin that are not tied to UI
func RegisterOriginAPI(router *gin.Engine, ctx context.Context, egrp *errgroup.Group) error {
if router == nil {
Expand Down
56 changes: 56 additions & 0 deletions server_utils/oidc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/***************************************************************
*
* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/

package server_utils

import (
"encoding/json"
"net/http"
"net/url"

"github.com/gin-gonic/gin"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
)

func exportOpenIDConfig(c *gin.Context) {
issuerURL, _ := url.Parse(param.Server_ExternalWebUrl.GetString())
jwksUri, _ := url.JoinPath(issuerURL.String(), "/.well-known/issuer.jwks")
jsonData := gin.H{
"issuer": issuerURL.String(),
"jwks_uri": jwksUri,
}

c.JSON(http.StatusOK, jsonData)
}

func exportIssuerJWKS(c *gin.Context) {
keys, _ := config.GetIssuerPublicJWKS()
buf, _ := json.MarshalIndent(keys, "", " ")

c.Data(http.StatusOK, "application/json; charset=utf-8", buf)
}

func RegisterOIDCAPI(engine *gin.Engine) {
group := engine.Group("/.well-known")
{
group.GET("/openid-configuration", exportOpenIDConfig)
group.GET("/issuer.jwks", exportIssuerJWKS)
}
}
32 changes: 22 additions & 10 deletions xrootd/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,6 @@ func EmitAuthfile(server server_structs.XRootDServer) error {
}
}

output.Write([]byte(outStr + strings.Join(words[2:], " ") + "\n"))
} else if server.GetServerType().IsEnabled(config.CacheType) && param.Cache_SelfTest.GetBool() {
// Set up cache self-test public read
outStr := "u * /pelican/monitoring lr "
output.Write([]byte(outStr + strings.Join(words[2:], " ") + "\n"))
} else {
output.Write([]byte(lineContents + " "))
Expand Down Expand Up @@ -343,11 +339,7 @@ func EmitAuthfile(server server_structs.XRootDServer) error {
// If nothing has been written to the output yet
var outStr string
if !foundPublicLine {
if param.Cache_SelfTest.GetBool() {
outStr = "u * /pelican/monitoring lr "
} else {
outStr = "u * "
}
outStr = "u * "
}
for _, ad := range server.GetNamespaceAds() {
if ad.PublicRead && ad.Path != "" {
Expand Down Expand Up @@ -545,7 +537,27 @@ func EmitScitokensConfig(server server_structs.XRootDServer) error {
}
return WriteOriginScitokensConfig(authedPrefixes)
} else if cacheServer, ok := server.(*cache.CacheServer); ok {
return WriteCacheScitokensConfig(cacheServer.GetNamespaceAds())
directorAds := cacheServer.GetNamespaceAds()
if param.Cache_SelfTest.GetBool() {
localIssuer, err := url.Parse(param.Server_ExternalWebUrl.GetString())
if err != nil {
log.Error("Can't parse Server_ExternalWebUrl when generating scitokens config: ", err)
return errors.Wrap(err, "can't parse Server_ExternalWebUrl when generating scitokens config")
}
cacheIssuer := server_structs.NamespaceAdV2{
PublicRead: false,
Caps: server_structs.Capabilities{PublicReads: false, Reads: true, Writes: true},
Path: "/pelican/monitoring",
Issuer: []server_structs.TokenIssuer{
{
BasePaths: []string{"/pelican/monitoring"},
IssuerUrl: *localIssuer,
},
},
}
directorAds = append(directorAds, cacheIssuer)
}
return WriteCacheScitokensConfig(directorAds)
} else {
return errors.New("Internal error: server object is neither cache nor origin")
}
Expand Down
3 changes: 1 addition & 2 deletions xrootd/origin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ func originMockup(ctx context.Context, egrp *errgroup.Group, t *testing.T) conte
engine, err := web_ui.GetEngine()
require.NoError(t, err)

err = origin.RegisterOriginOIDCAPI(engine.Group("/.well-known"))
require.NoError(t, err)
server_utils.RegisterOIDCAPI(engine)

shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
defer func() {
Expand Down
Loading

0 comments on commit 5dd6572

Please sign in to comment.