Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add federation token endpoint in Director, and implement routine for caches to fetch one #1985

Merged
merged 11 commits into from
Feb 21, 2025
Merged
137 changes: 137 additions & 0 deletions cache/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ import (
"encoding/json"
"net/url"
"strings"
"time"

"github.com/lestrrat-go/jwx/v2/jwt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/utils"
)

Expand All @@ -40,6 +45,9 @@ type (
}
)

// Can use this mechanism to override the minimum for the sake of tests
var MinFedTokenTickerRate = 1 * time.Minute

func (server *CacheServer) CreateAdvertisement(name, originUrl, originWebUrl string) (*server_structs.OriginAdvertiseV2, error) {
registryPrefix := server_structs.GetCacheNS(param.Xrootd_Sitename.GetString())
ad := server_structs.OriginAdvertiseV2{
Expand Down Expand Up @@ -188,3 +196,132 @@ func (server *CacheServer) GetNamespaceAdsFromDirector() error {
func (server *CacheServer) GetServerType() server_structs.ServerType {
return server_structs.CacheType
}

func (server *CacheServer) GetAdTokCfg(ctx context.Context) (adTokCfg server_structs.AdTokCfg, err error) {
fInfo, err := config.GetFederation(ctx)
if err != nil {
err = errors.Wrap(err, "failed to get federation info")
return
}
directorUrl := fInfo.DirectorEndpoint
if directorUrl == "" {
err = errors.New("unable to determine Director's URL")
return
}
adTokCfg.Audience = directorUrl
adTokCfg.Subject = param.Cache_Url.GetString()
adTokCfg.Issuer = param.Server_IssuerUrl.GetString()

return
}

func (server *CacheServer) GetFedTokLocation() string {
return param.Cache_FedTokenLocation.GetString()
}

// Given a token, calculate the lifetime of the token
func calcTokLifetime(tok string) (time.Duration, error) {
// I think verificationless parsing is fine here, because we already assume a strong
// trust relationship with the Director, and if its been compromised, we have bigger problems.
parsedTok, err := jwt.ParseInsecure([]byte(tok))
if err != nil {
return 0, err
}
return parsedTok.Expiration().Sub(parsedTok.IssuedAt()), nil
}

// validateTickerRate is the circuit breaker that prevents the ticker
// from firing too often. It also handles logging errors/warnings related
// to the token lifetime and the refresh rate.
func validateTickerRate(tickerRate time.Duration, tokLifetime time.Duration) time.Duration {
validated := tickerRate

if validated < MinFedTokenTickerRate {
log.Warningf("Deduced federation token refresh period is less than minimum of %.3fm; setting to %.3fm",
MinFedTokenTickerRate.Minutes(), MinFedTokenTickerRate.Minutes())
validated = MinFedTokenTickerRate
}

// Unfortunately we can't do anything here about the Director sending
// such short lived tokens unless we're willing to forgo the circuit
// breaker.
if validated > tokLifetime {
log.Errorf("Deduced federation token refresh period exceeds token lifetime. Tokens will expire before refresh")
}

log.Debugf("Federation token refresh rate set to %.3fm", validated.Minutes())

return validated
}

// getTickerRate calculates the rate at which the federation token should be refreshed
// by looking at the token lifetime and setting the ticker to 1/3 of that lifetime.
// If the token lifetime cannot be determined, the ticker is set to 1/3 of the default with
// a minimum of 1 minute.
func getTickerRate(tok string) time.Duration {
var tickerRate time.Duration
tokenLifetime, err := calcTokLifetime(tok)
if err != nil {
tokenLifetime = param.Director_FedTokenLifetime.GetDuration()
log.Errorf("Failed to calculate lifetime of federation token: %v.", err)
}
tickerRate = tokenLifetime / 3
return validateTickerRate(tickerRate, tokenLifetime)
}

func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server_structs.XRootDServer) {
// Do our initial token fetch+set, then turn things over to the ticker
tok, err := server_utils.CreateFedTok(ctx, cache)
if err != nil {
log.Errorf("Failed to get a federation token: %v", err)
}

// We want to fire the ticker at 1/3 the period of the token lifetime, or 1/3 the default
// lifetime for the token if we can't otherwise determine it. In most cases, the two values
// will be the same unless some fed administrator thinks they know better! This 1/3 period approach
// gives us a bit of buffer in the event the Director is down for a short period of time.
tickerRate := getTickerRate(tok)

// Set the token in the cache
err = server_utils.SetFedTok(ctx, cache, tok)
if err != nil {
log.Errorf("Failed to set the federation token: %v", err)
}

// TODO: Figure out what to do if the Director starts issuing tokens with a different
// lifetime --> we can adjust ticker period dynamically, but what's the sensible thing to do?
fedTokTicker := time.NewTicker(tickerRate)
egrp.Go(func() error {
defer fedTokTicker.Stop()
for {
select {
case <-fedTokTicker.C:
// Time to ask the Director for a new token
log.Debugln("Refreshing federation token")
tok, err := server_utils.CreateFedTok(ctx, cache)
if err != nil {
log.Errorf("Failed to get a federation token: %v", err)
continue
}
log.Traceln("Successfully received new federation token")

// Once again, parse the token, use it to set the next ticker fire
// while also building in a circuit breaker to set a min ticker rate
newTickerRate := getTickerRate(tok)
if newTickerRate != tickerRate {
fedTokTicker.Reset(newTickerRate)
tickerRate = newTickerRate
}

// Set the token in the cache
err = server_utils.SetFedTok(ctx, cache, tok)
if err != nil {
log.Errorf("Failed to write the federation token: %v", err)
}
log.Traceln("Successfully wrote new federation token to disk")
case <-ctx.Done():
return nil
}
}
})
}
21 changes: 21 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,13 @@ func discoverFederationImpl(ctx context.Context) (fedInfo pelican_url.Federation
fedInfo.BrokerEndpoint = externalUrlStr
}

// Some services in the Director, like federation tokens, may require a defined federation root (discovery URL)
// so for these to have a chance at working in places where a Director truly is acting as the fed root (e.g.
// fed-in-box unit tests), we need to set the discovery endpoint to the director endpoint.
if fedInfo.DiscoveryEndpoint == "" && enabledServers.IsEnabled(server_structs.DirectorType) {
fedInfo.DiscoveryEndpoint = fedInfo.DirectorEndpoint
}

// Make sure any values in global federation metadata are url-parseable
fedInfo.DirectorEndpoint = wrapWithHttpsIfNeeded(fedInfo.DirectorEndpoint)
fedInfo.RegistryEndpoint = wrapWithHttpsIfNeeded(fedInfo.RegistryEndpoint)
Expand All @@ -373,6 +380,9 @@ func discoverFederationImpl(ctx context.Context) (fedInfo pelican_url.Federation
fedInfo.JwksUri = viper.GetString("Federation.JwkUrl")
fedInfo.BrokerEndpoint = viper.GetString("Federation.BrokerUrl")
if fedInfo.DirectorEndpoint != "" && fedInfo.RegistryEndpoint != "" && fedInfo.JwksUri != "" && fedInfo.BrokerEndpoint != "" {
if federationStr != "" {
fedInfo.DiscoveryEndpoint = federationStr
}
return
}

Expand Down Expand Up @@ -1507,6 +1517,17 @@ func InitServer(ctx context.Context, currentServers server_structs.ServerType) e
}
}

// Set fed token locations for cache/origin. Note that fed tokens aren't yet used by the
// Origin (2025-02-04), but they may be soon for things like third party copy.
configDir := viper.GetString("ConfigDir")
if currentServers.IsEnabled(server_structs.OriginType) {
viper.SetDefault(param.Origin_FedTokenLocation.GetName(), filepath.Join(configDir, "origin-fed-token"))
}
if currentServers.IsEnabled(server_structs.CacheType) {
viper.SetDefault(param.Cache_FedTokenLocation.GetName(), filepath.Join(configDir, "cache-fed-token"))
os.Setenv("XRD_PELICANCACHETOKENLOCATION", param.Cache_FedTokenLocation.GetString())
}

// Unmarshal Viper config into a Go struct
unmarshalledConfig, err := param.UnmarshalConfig(viper.GetViper())
if err != nil || unmarshalledConfig == nil {
Expand Down
2 changes: 2 additions & 0 deletions config/resources/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ Director:
# it seems golang uses 500 - 1000 bytes per entry; a reduction to
# 2k means there will be around 1-2MB of cached data per server.
CachePresenceCapacity: 2000
RegistryQueryInterval: 1m
FedTokenLifetime: 15m
Cache:
DefaultCacheTimeout: "9.5s"
EnablePrefetch: true
Expand Down
26 changes: 18 additions & 8 deletions director/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ func getLinkDepth(filepath, prefix string) (int, error) {
// Aggregate various request parameters from header and query to a single url.Values struct
func getRequestParameters(req *http.Request) (requestParams url.Values) {
requestParams = url.Values{}

// Start off by passing along any generic query params. If we have any reserved query params
// that we specifically handle from both queries and headers, we'll overwrite them later with a Set().
for key, vals := range req.URL.Query() {
for _, val := range vals {
requestParams.Add(key, val)
}
}

authz := ""
if authzQuery := req.URL.Query()["authz"]; len(authzQuery) > 0 {
authz = authzQuery[0]
Expand All @@ -198,19 +207,19 @@ func getRequestParameters(req *http.Request) (requestParams url.Values) {

// url.Values.Encode will help us escape all them
if authz != "" {
requestParams.Add("authz", authz)
requestParams.Set("authz", authz)
}
if timeout != "" {
requestParams.Add("pelican.timeout", timeout)
requestParams.Set("pelican.timeout", timeout)
}
if skipStat {
requestParams.Add(pelican_url.QuerySkipStat, "")
requestParams.Set(pelican_url.QuerySkipStat, "")
}
if preferCached {
requestParams.Add(pelican_url.QueryPreferCached, "")
requestParams.Set(pelican_url.QueryPreferCached, "")
}
if directRead {
requestParams.Add(pelican_url.QueryDirectRead, "")
requestParams.Set(pelican_url.QueryDirectRead, "")
}
return
}
Expand Down Expand Up @@ -1008,7 +1017,7 @@ func ShortcutMiddleware(defaultResponse string) gin.HandlerFunc {
}
}

func registerServeAd(engineCtx context.Context, ctx *gin.Context, sType server_structs.ServerType) {
func registerServerAd(engineCtx context.Context, ctx *gin.Context, sType server_structs.ServerType) {
ctx.Set("serverType", sType.String())
tokens, present := ctx.Request.Header["Authorization"]
if !present || len(tokens) == 0 {
Expand Down Expand Up @@ -1616,8 +1625,9 @@ func RegisterDirectorAPI(ctx context.Context, router *gin.RouterGroup) {
directorAPIV1.PUT("/origin/*any", redirectToOrigin)
directorAPIV1.DELETE("/origin/*any", redirectToOrigin)
directorAPIV1.Handle("PROPFIND", "/origin/*any", redirectToOrigin)
directorAPIV1.POST("/registerOrigin", serverAdMetricMiddleware, func(gctx *gin.Context) { registerServeAd(ctx, gctx, server_structs.OriginType) })
directorAPIV1.POST("/registerCache", serverAdMetricMiddleware, func(gctx *gin.Context) { registerServeAd(ctx, gctx, server_structs.CacheType) })
directorAPIV1.POST("/registerOrigin", serverAdMetricMiddleware, func(gctx *gin.Context) { registerServerAd(ctx, gctx, server_structs.OriginType) })
directorAPIV1.POST("/registerCache", serverAdMetricMiddleware, func(gctx *gin.Context) { registerServerAd(ctx, gctx, server_structs.CacheType) })
directorAPIV1.GET("/getFedToken", getFedToken)
directorAPIV1.GET("/listNamespaces", listNamespacesV1)
directorAPIV1.GET("/namespaces/prefix/*path", getPrefixByPath)
directorAPIV1.GET("/healthTest/*path", getHealthTestFile)
Expand Down
2 changes: 1 addition & 1 deletion director/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestDirectorRegistration(t *testing.T) {
}

setupRequest := func(c *gin.Context, r *gin.Engine, bodyByte []byte, token string, stype server_structs.ServerType) {
r.POST("/", func(gctx *gin.Context) { registerServeAd(ctx, gctx, stype) })
r.POST("/", func(gctx *gin.Context) { registerServerAd(ctx, gctx, stype) })
c.Request, _ = http.NewRequest(http.MethodPost, "/", bytes.NewBuffer(bodyByte))
c.Request.Header.Set("Authorization", "Bearer "+token)
c.Request.Header.Set("Content-Type", "application/json")
Expand Down
Loading
Loading