From 06d589d4b50ee5a25b0567c62272e47b3b7da44b Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Tue, 4 Feb 2025 18:45:12 +0000 Subject: [PATCH 01/11] Add "federation token" endpoint to the Director This new endpoint allows caches or origins to get a federation-signed token to provide other federation services that may require proof of federation membership. The immediate usecase is for caches to provide Origins that haven't enabled DirectReads, although I think this may also be useful for a future where we support third-party copies between origins. I tried to add a little bit of flexibility to the endpoint to support that use case, even though we don't have the immediate need. The authz scheme here is that a server requesting a fed token must provide its hostname, server type and a valid advertisement token to the Director. The Director then verifies the advertisement token, which is de facto proof of federation membership, and if valid issues a personally-signed token. --- config/resources/defaults.yaml | 2 + director/director.go | 26 ++- director/director_test.go | 2 +- director/fed_token.go | 206 +++++++++++++++++++++++ director/fed_token_test.go | 287 +++++++++++++++++++++++++++++++++ docs/parameters.yaml | 8 + param/parameters.go | 1 + param/parameters_struct.go | 2 + registry/registry.go | 7 +- server_structs/auth.go | 5 + 10 files changed, 531 insertions(+), 15 deletions(-) create mode 100644 director/fed_token.go create mode 100644 director/fed_token_test.go diff --git a/config/resources/defaults.yaml b/config/resources/defaults.yaml index 786ed779c..db136bf3c 100644 --- a/config/resources/defaults.yaml +++ b/config/resources/defaults.yaml @@ -52,6 +52,8 @@ Director: # it seems golang uses 500 - 1000 bytes per entry; a reduction to # 2k means there will be around 1-2MB of cached data per server. CachePresenceCapacity: 2000 + RegistryQueryInterval: 1m + FedTokenLifetime: 15m Cache: DefaultCacheTimeout: "9.5s" EnablePrefetch: true diff --git a/director/director.go b/director/director.go index a5fc35178..0678e48bd 100644 --- a/director/director.go +++ b/director/director.go @@ -175,6 +175,15 @@ func getLinkDepth(filepath, prefix string) (int, error) { // Aggregate various request parameters from header and query to a single url.Values struct func getRequestParameters(req *http.Request) (requestParams url.Values) { requestParams = url.Values{} + + // Start off by passing along any generic query params. If we have any reserved query params + // that we specifically handle from both queries and headers, we'll overwrite them later with a Set(). + for key, vals := range req.URL.Query() { + for _, val := range vals { + requestParams.Add(key, val) + } + } + authz := "" if authzQuery := req.URL.Query()["authz"]; len(authzQuery) > 0 { authz = authzQuery[0] @@ -198,19 +207,19 @@ func getRequestParameters(req *http.Request) (requestParams url.Values) { // url.Values.Encode will help us escape all them if authz != "" { - requestParams.Add("authz", authz) + requestParams.Set("authz", authz) } if timeout != "" { - requestParams.Add("pelican.timeout", timeout) + requestParams.Set("pelican.timeout", timeout) } if skipStat { - requestParams.Add(pelican_url.QuerySkipStat, "") + requestParams.Set(pelican_url.QuerySkipStat, "") } if preferCached { - requestParams.Add(pelican_url.QueryPreferCached, "") + requestParams.Set(pelican_url.QueryPreferCached, "") } if directRead { - requestParams.Add(pelican_url.QueryDirectRead, "") + requestParams.Set(pelican_url.QueryDirectRead, "") } return } @@ -1008,7 +1017,7 @@ func ShortcutMiddleware(defaultResponse string) gin.HandlerFunc { } } -func registerServeAd(engineCtx context.Context, ctx *gin.Context, sType server_structs.ServerType) { +func registerServerAd(engineCtx context.Context, ctx *gin.Context, sType server_structs.ServerType) { ctx.Set("serverType", sType.String()) tokens, present := ctx.Request.Header["Authorization"] if !present || len(tokens) == 0 { @@ -1616,8 +1625,9 @@ func RegisterDirectorAPI(ctx context.Context, router *gin.RouterGroup) { directorAPIV1.PUT("/origin/*any", redirectToOrigin) directorAPIV1.DELETE("/origin/*any", redirectToOrigin) directorAPIV1.Handle("PROPFIND", "/origin/*any", redirectToOrigin) - directorAPIV1.POST("/registerOrigin", serverAdMetricMiddleware, func(gctx *gin.Context) { registerServeAd(ctx, gctx, server_structs.OriginType) }) - directorAPIV1.POST("/registerCache", serverAdMetricMiddleware, func(gctx *gin.Context) { registerServeAd(ctx, gctx, server_structs.CacheType) }) + directorAPIV1.POST("/registerOrigin", serverAdMetricMiddleware, func(gctx *gin.Context) { registerServerAd(ctx, gctx, server_structs.OriginType) }) + directorAPIV1.POST("/registerCache", serverAdMetricMiddleware, func(gctx *gin.Context) { registerServerAd(ctx, gctx, server_structs.CacheType) }) + directorAPIV1.GET("/getFedToken", getFedToken) directorAPIV1.GET("/listNamespaces", listNamespacesV1) directorAPIV1.GET("/namespaces/prefix/*path", getPrefixByPath) directorAPIV1.GET("/healthTest/*path", getHealthTestFile) diff --git a/director/director_test.go b/director/director_test.go index e5d293437..5b2f11bd9 100644 --- a/director/director_test.go +++ b/director/director_test.go @@ -239,7 +239,7 @@ func TestDirectorRegistration(t *testing.T) { } setupRequest := func(c *gin.Context, r *gin.Engine, bodyByte []byte, token string, stype server_structs.ServerType) { - r.POST("/", func(gctx *gin.Context) { registerServeAd(ctx, gctx, stype) }) + r.POST("/", func(gctx *gin.Context) { registerServerAd(ctx, gctx, stype) }) c.Request, _ = http.NewRequest(http.MethodPost, "/", bytes.NewBuffer(bodyByte)) c.Request.Header.Set("Authorization", "Bearer "+token) c.Request.Header.Set("Content-Type", "application/json") diff --git a/director/fed_token.go b/director/fed_token.go new file mode 100644 index 000000000..26abacc75 --- /dev/null +++ b/director/fed_token.go @@ -0,0 +1,206 @@ +/*************************************************************** +* +* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research +* +* Licensed under the Apache License, Version 2.0 (the "License"); you +* may not use this file except in compliance with the License. You may +* obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +***************************************************************/ + +package director + +import ( + "fmt" + "net/http" + + "github.com/gin-gonic/gin" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/pelicanplatform/pelican/config" + "github.com/pelicanplatform/pelican/param" + "github.com/pelicanplatform/pelican/server_structs" + "github.com/pelicanplatform/pelican/token" + "github.com/pelicanplatform/pelican/token_scopes" +) + +type requestInfo struct { + Host string + SType server_structs.ServerType + Tok string +} + +func validateFedTokRequest(ginCtx *gin.Context) (rInfo requestInfo, err error) { + // Parse the incoming request parameters, from which we will extract the token + // and the hostname of the cache. + // NOTE -- this function will also grab tokens from an Authorization header and pass + // them back as a request parameter. + reqParams := getRequestParameters(ginCtx.Request) + hNames, exists := reqParams["host"] + if !exists || len(hNames) == 0 { + err = fmt.Errorf("no hostname found in the 'host' url parameter") + return + } else if len(hNames) > 1 { + err = fmt.Errorf("multiple hostnames found in the 'host' url parameter") + return + } + rInfo.Host = hNames[0] + + sTypes, exists := reqParams["sType"] + var sType server_structs.ServerType + if !exists || len(sTypes) == 0 { + err = fmt.Errorf("host '%s' generated request with no server type found in the 'sType' url parameter", rInfo.Host) + return + } else if len(sTypes) > 1 { + err = fmt.Errorf("host '%s' generated request with multiple server types in the 'sType' url parameter", rInfo.Host) + return + } + valid := sType.SetString(sTypes[0]) + if !valid || (sType != server_structs.CacheType && sType != server_structs.OriginType) { + err = fmt.Errorf("host '%s' generated request with invalid server type '%s' as value of 'sType' url parameter", rInfo.Host, sTypes[0]) + return + } + rInfo.SType = sType + + // Note that our getRequestParameters function will also check the Authorization header, but multiple tokens are stripped + // such that we only look at the first one. + tok, exists := reqParams["authz"] + if !exists || len(tok) == 0 { + err = fmt.Errorf("host '%s' generated request with no authorization token in 'Authorization' header or 'authz' url parameter", rInfo.Host) + return + } + rInfo.Tok = tok[0] + + return +} + +func createFedTok(ginCtx *gin.Context, rInfo requestInfo) (tok string, err error) { + // The federation token will be signed by the Director on behalf of the federation, so + // we still use the Discovery endpoint as the issuer. + fed, err := config.GetFederation(ginCtx) + if err != nil { + err = errors.Wrap(err, "federation issuer could not be determined") + return + } + if fed.DiscoveryEndpoint == "" { + err = errors.New("federation issuer is not set") + return + } + fToken := token.NewWLCGToken() + fToken.Lifetime = param.Director_FedTokenLifetime.GetDuration() + fToken.Subject = rInfo.Host + fToken.Issuer = fed.DiscoveryEndpoint + // This token is meant to be consumed by any origin in the system. However, without + // knowing every origin in the system ahead of time, we can't add them all so we + // use the more permissive "ANY" audience. + fToken.AddAudienceAny() + + // The token should be scoped such that the cache only has permission for the namespaces + // indicated by the Director + allowedPrefixesPtr := allowedPrefixesForCaches.Load() + if allowedPrefixesPtr == nil { + err = errors.New("the Director could not determine allowed prefixes for the provided host") + return + } + allowedPrefixes := *allowedPrefixesPtr + + hostPrefixes, exists := allowedPrefixes[rInfo.Host] + if !exists { + // If there are no prefixes, we assume the cache is configured to read all namespaces + hostPrefixes = map[string]struct{}{"/": {}} + } + + scopes := make([]token_scopes.TokenScope, 0, len(allowedPrefixes[rInfo.Host])) + for prefix := range hostPrefixes { + var readScope token_scopes.TokenScope + readScope, err = token_scopes.Storage_Read.Path(prefix) + if err != nil { + err = errors.Wrap(err, "token scopes could not be created") + return + } + scopes = append(scopes, readScope) + } + fToken.AddScopes(scopes...) + + tok, err = fToken.CreateToken() + if err != nil { + err = errors.Wrap(err, "could not create/sign token") + return + } + + return +} + +func getFedToken(ginCtx *gin.Context) { + rInfo, err := validateFedTokRequest(ginCtx) + if err != nil { + log.Debugf("Error validating incoming request: %s", err) + ginCtx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: err.Error(), + }) + return + } + + // Validate the token by talking to the Registry. Note that server type has already been validated + // and is either Cache or Origin. + var registryPrefix string + if rInfo.SType == server_structs.CacheType { + registryPrefix = fmt.Sprintf("%s%s", server_structs.CachePrefix, rInfo.Host) + } else if rInfo.SType == server_structs.OriginType { + registryPrefix = fmt.Sprintf("%s%s", server_structs.OriginPrefix, rInfo.Host) + } + // Any token that grants authorization to advertise within a federation should be enough + // to determine that the server is part of the federation. + if ok, err := verifyAdvertiseToken(ginCtx, rInfo.Tok, registryPrefix); err != nil { + if errors.Is(err, adminApprovalErr) { + log.Debugf("Host '%s' has not been approved by an administrator", rInfo.Host) + ginCtx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: fmt.Sprintf("Host '%s' has not been approved by an administrator", rInfo.Host), + }) + return + } + + // An otherwise unexpected error occurred + log.Warningf("Failed to verify advertise token from host '%s': %v", rInfo.Host, err) + ginCtx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Failed to verify advertise token", + }) + return + } else if !ok { + // We read the token, but we don't like it + log.Debugf("Advertise token from host '%s' was rejected", rInfo.Host) + ginCtx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "The provided advertise token was rejected", + }) + return + } + + // We've validated the incoming token and decided to issue the federation token + tok, err := createFedTok(ginCtx, rInfo) + if err != nil { + log.Warningf("Failed to create federation token for host '%s': %v", rInfo.Host, err) + ginCtx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "Attempted to create federation token but failed unexpectedly", + }) + return + } + + // Respond with the token + ginCtx.JSON(http.StatusOK, server_structs.TokenResponse{ + AccessToken: tok, + }) +} diff --git a/director/fed_token_test.go b/director/fed_token_test.go new file mode 100644 index 000000000..d8bd7f44b --- /dev/null +++ b/director/fed_token_test.go @@ -0,0 +1,287 @@ +/*************************************************************** +* +* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research +* +* Licensed under the Apache License, Version 2.0 (the "License"); you +* may not use this file except in compliance with the License. You may +* obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +***************************************************************/ + +package director + +import ( + "net/http/httptest" + "net/url" + "path/filepath" + "strings" + "testing" + + "github.com/gin-gonic/gin" + "github.com/lestrrat-go/jwx/v2/jwt" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + + "github.com/pelicanplatform/pelican/config" + "github.com/pelicanplatform/pelican/param" + "github.com/pelicanplatform/pelican/pelican_url" + "github.com/pelicanplatform/pelican/server_structs" + "github.com/pelicanplatform/pelican/server_utils" +) + +func TestValidateRequest(t *testing.T) { + testCases := []struct { + name string + host []string // Using slices here so we can trigger errors on purpose + sType []string + tok string + authFromHeader bool + expectErr bool + errStr string + }{ + { + name: "Valid request", + host: []string{"cache1"}, + sType: []string{"Cache"}, + tok: "token1", + authFromHeader: false, + expectErr: false, + errStr: "", + }, + { + name: "No hostname", + host: []string{}, + sType: []string{"Cache"}, + tok: "token1", + authFromHeader: false, + expectErr: true, + errStr: "no hostname found in the 'host' url parameter", + }, + { + name: "Multiple hostnames", + host: []string{"cache1", "cache2"}, + sType: []string{"Cache"}, + tok: "token1", + authFromHeader: false, + expectErr: true, + errStr: "multiple hostnames found in the 'host' url parameter", + }, + { + name: "No server type", + host: []string{"cache1"}, + sType: []string{}, + tok: "token1", + authFromHeader: false, + expectErr: true, + errStr: "host 'cache1' generated request with no server type found in the 'sType' url parameter", + }, + { + name: "Invalid server type", + host: []string{"cache1"}, + sType: []string{"Invalid"}, + tok: "token1", + authFromHeader: false, + expectErr: true, + errStr: "host 'cache1' generated request with invalid server type 'Invalid' as value of 'sType' url parameter", + }, + { + name: "Multiple server types", + host: []string{"cache1"}, + sType: []string{"Cache", "Origin"}, + tok: "token1", + authFromHeader: false, + expectErr: true, + errStr: "host 'cache1' generated request with multiple server types in the 'sType' url parameter", + }, + { + name: "No token", + host: []string{"cache1"}, + sType: []string{"Cache"}, + tok: "", + authFromHeader: false, + expectErr: true, + errStr: "host 'cache1' generated request with no authorization token in 'Authorization' header or 'authz' url parameter", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + params := url.Values{} + if len(tc.host) > 0 { + for _, h := range tc.host { + params.Add("host", h) + } + } + if len(tc.sType) > 0 { + for _, st := range tc.sType { + params.Add("sType", st) + } + } + if !tc.authFromHeader && tc.tok != "" { + params.Add("authz", tc.tok) + } + + req := httptest.NewRequest("GET", "/test?"+params.Encode(), nil) + + // Add authorization headers AFTER request creation -- they will be overwritten otherwise + if tc.authFromHeader && tc.tok != "" { + req.Header.Add("Authorization", tc.tok) + } + c.Request = req + + rInfo, err := validateFedTokRequest(c) + + if tc.expectErr { + assert.Error(t, err) + assert.Contains(t, err.Error(), tc.errStr) + return + } else { + assert.NoError(t, err) + } + + assert.Equal(t, rInfo.Host, tc.host[0]) + assert.Equal(t, rInfo.SType.String(), tc.sType[0]) + assert.Equal(t, tc.tok, rInfo.Tok) + }) + } +} + +func parseJWT(tokenString string) (scopes []string, issuer string, err error) { + // Parse without verification + tok, err := jwt.ParseInsecure([]byte(tokenString)) + if err != nil { + return nil, "", err + } + + issuer = tok.Issuer() + if raw, exists := tok.Get("scope"); exists { + if scopeStr, ok := raw.(string); ok { + scopes = strings.Split(scopeStr, " ") + } + } + + return scopes, issuer, nil +} + +func TestCreateFedTok(t *testing.T) { + server_utils.ResetTestState() + defer server_utils.ResetTestState() + + kDir := filepath.Join(t.TempDir(), "keys") + viper.Set(param.IssuerKeysDirectory.GetName(), kDir) + viper.Set("ConfigDir", t.TempDir()) + + testCases := []struct { + name string + host string + sType server_structs.ServerType + discoveryUrl string + allowedPrefixes map[string]map[string]struct{} + expectErr bool + errContains string + }{ + { + name: "Valid request", + host: "test-cache.example.com", + sType: server_structs.CacheType, + discoveryUrl: "https://my-federation.com", + allowedPrefixes: map[string]map[string]struct{}{ + "test-cache.example.com": { + "/foo": struct{}{}, + "/bar": struct{}{}, + }, + "different-cache.example.com": { + "/baz": struct{}{}, + }, + }, + expectErr: false, + }, + { + name: "Malformed discovery config", + host: "test-cache.example.com", + sType: server_structs.CacheType, + discoveryUrl: "", + allowedPrefixes: map[string]map[string]struct{}{}, + expectErr: true, + errContains: "federation issuer is not set", + }, + { + name: "No allowed prefixes defaults to root of namespace", + host: "test-cache.example.com", + sType: server_structs.CacheType, + discoveryUrl: "https://my-federation.com", + allowedPrefixes: map[string]map[string]struct{}{ + "different-cache.example.com": { + "/baz": struct{}{}, + }, + }, + expectErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // config.ResetFederationForTest() + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + config.ResetFederationForTest() + fed := pelican_url.FederationDiscovery{ + // Most of these aren't actually used by the test, but to prevent auto discovery + // and needing to spin up a separate mock discovery server, set them all. + DiscoveryEndpoint: tc.discoveryUrl, + DirectorEndpoint: "https://dne-director.com", + RegistryEndpoint: "https://dne-registry.com", + JwksUri: "https://dne-jwks.com", + BrokerEndpoint: "https://dne-broker.com", + } + config.SetFederation(fed) + config.InitConfig() // Helps us populate the keys directory with a signing key + + allowedPrefixesForCaches.Store(&tc.allowedPrefixes) + rInfo := requestInfo{ + Host: tc.host, + SType: tc.sType, + } + + tok, err := createFedTok(c, rInfo) + + if tc.expectErr { + assert.Error(t, err) + if tc.errContains != "" { + assert.Contains(t, err.Error(), tc.errContains) + } + return + } + + // Make sure we don't have an error and that we _do_ have a token + assert.NoError(t, err) + assert.NotEmpty(t, tok) + + // Verify token contents + scopes, issuer, err := parseJWT(tok) + assert.NoError(t, err) + assert.Equal(t, tc.discoveryUrl, issuer) + if _, exists := tc.allowedPrefixes[tc.host]; !exists { + assert.Len(t, scopes, 1) + assert.Equal(t, "storage.read:/", scopes[0]) + } else { + assert.Len(t, scopes, len(tc.allowedPrefixes[tc.host])) + for _, scope := range scopes { + scope = strings.Split(scope, ":")[1] + assert.Contains(t, tc.allowedPrefixes[tc.host], scope) + } + } + }) + } +} diff --git a/docs/parameters.yaml b/docs/parameters.yaml index 3d6aa540c..ae3bda212 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -1822,6 +1822,14 @@ hidden: true default: 1m components: ["director"] --- +name: Director.FedTokenLifetime +description: |+ + The default lifetime assigned to tokens issued by the director on behalf of the federation. These tokens may be issued + to caches to prove their authorization within the federation to origins that require it. +type: duration +default: 15m +components: ["director"] +--- ############################ # Registry-level configs # ############################ diff --git a/param/parameters.go b/param/parameters.go index d8bdfffa4..599c2c343 100644 --- a/param/parameters.go +++ b/param/parameters.go @@ -411,6 +411,7 @@ var ( Client_StoppedTransferTimeout = DurationParam{"Client.StoppedTransferTimeout"} Director_AdvertisementTTL = DurationParam{"Director.AdvertisementTTL"} Director_CachePresenceTTL = DurationParam{"Director.CachePresenceTTL"} + Director_FedTokenLifetime = DurationParam{"Director.FedTokenLifetime"} Director_OriginCacheHealthTestInterval = DurationParam{"Director.OriginCacheHealthTestInterval"} Director_RegistryQueryInterval = DurationParam{"Director.RegistryQueryInterval"} Director_StatTimeout = DurationParam{"Director.StatTimeout"} diff --git a/param/parameters_struct.go b/param/parameters_struct.go index 2e852707c..1449f4f16 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -86,6 +86,7 @@ type Config struct { EnableBroker bool `mapstructure:"enablebroker" yaml:"EnableBroker"` EnableOIDC bool `mapstructure:"enableoidc" yaml:"EnableOIDC"` EnableStat bool `mapstructure:"enablestat" yaml:"EnableStat"` + FedTokenLifetime time.Duration `mapstructure:"fedtokenlifetime" yaml:"FedTokenLifetime"` FilteredServers []string `mapstructure:"filteredservers" yaml:"FilteredServers"` GeoIPLocation string `mapstructure:"geoiplocation" yaml:"GeoIPLocation"` MaxMindKeyFile string `mapstructure:"maxmindkeyfile" yaml:"MaxMindKeyFile"` @@ -417,6 +418,7 @@ type configWithType struct { EnableBroker struct { Type string; Value bool } EnableOIDC struct { Type string; Value bool } EnableStat struct { Type string; Value bool } + FedTokenLifetime struct { Type string; Value time.Duration } FilteredServers struct { Type string; Value []string } GeoIPLocation struct { Type string; Value string } MaxMindKeyFile struct { Type string; Value string } diff --git a/registry/registry.go b/registry/registry.go index 9de46fc09..fefe69241 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -78,11 +78,6 @@ type Response struct { DeviceCode string `json:"device_code"` } -type TokenResponse struct { - AccessToken string `json:"access_token"` - Error string `json:"error"` -} - // Various auxiliary functions used for client-server security handshakes type NamespaceConfig struct { JwksUri string `json:"jwks_uri"` @@ -650,7 +645,7 @@ func cliRegisterNamespace(ctx *gin.Context) { return } - var tokenResponse TokenResponse + var tokenResponse server_structs.TokenResponse err = json.Unmarshal(body, &tokenResponse) if err != nil { ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{ diff --git a/server_structs/auth.go b/server_structs/auth.go index e87524c10..2a9ce028f 100644 --- a/server_structs/auth.go +++ b/server_structs/auth.go @@ -27,4 +27,9 @@ type ( State string `form:"state"` Code string `form:"code"` } + + TokenResponse struct { + AccessToken string `json:"access_token"` + Error string `json:"error"` + } ) From 93f25645d2a575d06d0772ec504fddc98d239fc9 Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Tue, 4 Feb 2025 21:35:34 +0000 Subject: [PATCH 02/11] Implement routine for cache to fetch and store federation tokens This is still missing a lot of necessary testing, but I'm worked into a hole the way I started implementing this that makes sorting out cyclic imports hard. Rather than fight through that now, I decided to commit what I have so others can comment on the overall approach. --- cache/advertise.go | 67 ++++++++++++ config/config.go | 14 +++ docs/parameters.yaml | 17 +++ launcher_utils/advertise.go | 23 +--- launchers/cache_serve.go | 2 + origin/advertise.go | 24 +++++ param/parameters.go | 2 + param/parameters_struct.go | 8 ++ server_structs/xrootd_server.go | 9 ++ server_utils/server_utils.go | 172 ++++++++++++++++++++++++++++++ server_utils/server_utils_test.go | 85 +++++++++++++++ 11 files changed, 403 insertions(+), 20 deletions(-) diff --git a/cache/advertise.go b/cache/advertise.go index 52b3b0a94..dec8ace09 100644 --- a/cache/advertise.go +++ b/cache/advertise.go @@ -23,12 +23,16 @@ import ( "encoding/json" "net/url" "strings" + "time" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" "github.com/pelicanplatform/pelican/config" "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_structs" + "github.com/pelicanplatform/pelican/server_utils" "github.com/pelicanplatform/pelican/utils" ) @@ -188,3 +192,66 @@ func (server *CacheServer) GetNamespaceAdsFromDirector() error { func (server *CacheServer) GetServerType() server_structs.ServerType { return server_structs.CacheType } + +func (server *CacheServer) GetAdTokCfg(ctx context.Context) (adTokCfg server_structs.AdTokCfg, err error) { + fInfo, err := config.GetFederation(ctx) + if err != nil { + err = errors.Wrap(err, "failed to get federation info") + return + } + directorUrl := fInfo.DirectorEndpoint + if directorUrl == "" { + err = errors.New("unable to determine Director's URL") + return + } + adTokCfg.Audience = directorUrl + adTokCfg.Subject = param.Cache_Url.GetString() + adTokCfg.Issuer = param.Server_IssuerUrl.GetString() + + return +} + +func (server *CacheServer) GetFedTokLocation() string { + return param.Cache_FedTokenLocation.GetString() +} + +func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server_structs.XRootDServer) { + // Do our initial token fetch+set, then turn things over to the ticker + tok, err := server_utils.GetFedTok(ctx, cache) + if err != nil { + log.Errorf("Failed to get a federation token: %v", err) + } + + // Set the token in the cache + err = server_utils.SetFedTok(ctx, cache, tok) + if err != nil { + log.Errorf("Failed to set the federation token: %v", err) + } + + // Fire the ticker every at 1/3 the period of token lifetime. This gives us a bit + // of buffer in the event the Director is down for a short period of time. + fedTokTicker := time.NewTicker(param.Director_FedTokenLifetime.GetDuration() / 3) + egrp.Go(func() error { + defer fedTokTicker.Stop() + for { + select { + case <-fedTokTicker.C: + // Time to ask the Director for a new token + log.Debugln("Refreshing federation token") + tok, err := server_utils.GetFedTok(ctx, cache) + if err != nil { + log.Errorf("Failed to get a federation token: %v", err) + continue + } + + // Set the token in the cache + err = server_utils.SetFedTok(ctx, cache, tok) + if err != nil { + log.Errorf("Failed to write the federation token: %v", err) + } + case <-ctx.Done(): + return nil + } + } + }) +} diff --git a/config/config.go b/config/config.go index 3bcf643ed..aa42ab590 100644 --- a/config/config.go +++ b/config/config.go @@ -373,6 +373,9 @@ func discoverFederationImpl(ctx context.Context) (fedInfo pelican_url.Federation fedInfo.JwksUri = viper.GetString("Federation.JwkUrl") fedInfo.BrokerEndpoint = viper.GetString("Federation.BrokerUrl") if fedInfo.DirectorEndpoint != "" && fedInfo.RegistryEndpoint != "" && fedInfo.JwksUri != "" && fedInfo.BrokerEndpoint != "" { + if federationStr != "" { + fedInfo.DiscoveryEndpoint = federationStr + } return } @@ -1507,6 +1510,17 @@ func InitServer(ctx context.Context, currentServers server_structs.ServerType) e } } + // Set fed token locations for cache/origin. Note that fed tokens aren't yet used by the + // Origin (2025-02-04), but they may be soon for things like third party copy. + configDir := viper.GetString("ConfigDir") + if currentServers.IsEnabled(server_structs.OriginType) { + viper.SetDefault(param.Origin_FedTokenLocation.GetName(), filepath.Join(configDir, "origin-fed-token")) + } + if currentServers.IsEnabled(server_structs.CacheType) { + viper.SetDefault(param.Cache_FedTokenLocation.GetName(), filepath.Join(configDir, "cache-fed-token")) + os.Setenv("XRD_PELICANCACHETOKENLOCATION", param.Cache_FedTokenLocation.GetString()) + } + // Unmarshal Viper config into a Go struct unmarshalledConfig, err := param.UnmarshalConfig(viper.GetViper()) if err != nil || unmarshalledConfig == nil { diff --git a/docs/parameters.yaml b/docs/parameters.yaml index ae3bda212..a5902463d 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -1148,6 +1148,15 @@ root_default: /run/pelican/xrootd/origin/globus default: $XDG_RUNTIME_DIR/pelican/xrootd/origin/globus components: ["origin"] --- +name: Origin.FedTokenLocation +description: |+ + A path to the file containing a token issued by the federation's issuer. This token may be consumed by other federation services + to prove the origin's membership in the federation. For example, a third-party copy from one Origin to another that serves a namespace + without DirectReads enabled may require a token to prove the origin's membership in the federation. +type: filename +default: $ConfigBase/origin-fed-token +components: ["origin"] +--- ############################ # Local cache configs # ############################ @@ -1556,6 +1565,14 @@ description: |+ a client certificate from the client. type: bool default: false +--- +name: Cache.FedTokenLocation +description: |+ + A path to the file containing a token issued by the federation's issuer. This token may be consumed by other federation services + to prove the cache's membership in the federation. For example, Origin's serving a namespace without DirectReads enabled require + that all clients prove they come from within the federation. +type: filename +default: $ConfigBase/cache-fed-token components: ["cache"] --- ############################ diff --git a/launcher_utils/advertise.go b/launcher_utils/advertise.go index e51d1b488..c8f17ad39 100644 --- a/launcher_utils/advertise.go +++ b/launcher_utils/advertise.go @@ -41,8 +41,7 @@ import ( "github.com/pelicanplatform/pelican/metrics" "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_structs" - "github.com/pelicanplatform/pelican/token" - "github.com/pelicanplatform/pelican/token_scopes" + "github.com/pelicanplatform/pelican/server_utils" "github.com/pelicanplatform/pelican/utils" ) @@ -169,10 +168,6 @@ func advertiseInternal(ctx context.Context, server server_structs.XRootDServer) } serverUrl := param.Origin_Url.GetString() webUrl := param.Server_ExternalWebUrl.GetString() - serverIssuer, err := config.GetServerIssuerURL() - if err != nil { - return errors.Wrap(err, "failed to get server issuer URL") - } if server.GetServerType().IsEnabled(server_structs.CacheType) { serverUrl = param.Cache_Url.GetString() @@ -203,21 +198,9 @@ func advertiseInternal(ctx context.Context, server server_structs.XRootDServer) directorUrl.Path = "/api/v1.0/director/register" + server.GetServerType().String() - advTokenCfg := token.NewWLCGToken() - advTokenCfg.Lifetime = time.Minute - advTokenCfg.Issuer = serverIssuer - advTokenCfg.AddAudiences(fedInfo.DirectorEndpoint) - if server.GetServerType().IsEnabled(server_structs.CacheType) { - advTokenCfg.Subject = "cache" - } else if server.GetServerType().IsEnabled(server_structs.OriginType) { - advTokenCfg.Subject = "origin" - } - advTokenCfg.AddScopes(token_scopes.Pelican_Advertise) - - // CreateToken also handles validation for us - tok, err := advTokenCfg.CreateToken() + tok, err := server_utils.GetAdvertisementTok(ctx, server) if err != nil { - return errors.Wrap(err, "failed to create director advertisement token") + return errors.Wrap(err, "failed to get advertisement token") } req, err := http.NewRequestWithContext(ctx, http.MethodPost, directorUrl.String(), bytes.NewBuffer(body)) diff --git a/launchers/cache_serve.go b/launchers/cache_serve.go index e91c7cc99..83809a060 100644 --- a/launchers/cache_serve.go +++ b/launchers/cache_serve.go @@ -105,6 +105,8 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m cache.LaunchDirectorTestFileCleanup(ctx) + cache.LaunchFedTokManager(ctx, egrp, cacheServer) + if param.Cache_SelfTest.GetBool() { err = cache.InitSelfTestDir() if err != nil { diff --git a/origin/advertise.go b/origin/advertise.go index fa9272622..867d62f67 100644 --- a/origin/advertise.go +++ b/origin/advertise.go @@ -199,3 +199,27 @@ func (server *OriginServer) GetAuthorizedPrefixes() ([]string, error) { return prefixes, nil } + +// Advertisement token configuration for the origin server. Used to get Origin-specific +// config that would differ from caches. +func (server *OriginServer) GetAdTokCfg(ctx context.Context) (adTokCfg server_structs.AdTokCfg, err error) { + fInfo, err := config.GetFederation(ctx) + if err != nil { + err = errors.Wrap(err, "failed to get federation info") + return + } + directorUrl := fInfo.DirectorEndpoint + if directorUrl == "" { + err = errors.New("unable to determine Director's URL") + return + } + adTokCfg.Audience = directorUrl + adTokCfg.Subject = param.Origin_Url.GetString() + adTokCfg.Issuer = param.Server_IssuerUrl.GetString() + + return +} + +func (server *OriginServer) GetFedTokLocation() string { + return param.Origin_FedTokenLocation.GetString() +} diff --git a/param/parameters.go b/param/parameters.go index 599c2c343..9277b14ae 100644 --- a/param/parameters.go +++ b/param/parameters.go @@ -149,6 +149,7 @@ var ( Cache_DataLocation = StringParam{"Cache.DataLocation"} Cache_DbLocation = StringParam{"Cache.DbLocation"} Cache_ExportLocation = StringParam{"Cache.ExportLocation"} + Cache_FedTokenLocation = StringParam{"Cache.FedTokenLocation"} Cache_FilesBaseSize = StringParam{"Cache.FilesBaseSize"} Cache_FilesMaxSize = StringParam{"Cache.FilesMaxSize"} Cache_FilesNominalSize = StringParam{"Cache.FilesNominalSize"} @@ -220,6 +221,7 @@ var ( OIDC_UserInfoEndpoint = StringParam{"OIDC.UserInfoEndpoint"} Origin_DbLocation = StringParam{"Origin.DbLocation"} Origin_ExportVolume = StringParam{"Origin.ExportVolume"} + Origin_FedTokenLocation = StringParam{"Origin.FedTokenLocation"} Origin_FederationPrefix = StringParam{"Origin.FederationPrefix"} Origin_GlobusClientIDFile = StringParam{"Origin.GlobusClientIDFile"} Origin_GlobusClientSecretFile = StringParam{"Origin.GlobusClientSecretFile"} diff --git a/param/parameters_struct.go b/param/parameters_struct.go index 1449f4f16..850d5c3d7 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -37,9 +37,14 @@ type Config struct { EnableTLSClientAuth bool `mapstructure:"enabletlsclientauth" yaml:"EnableTLSClientAuth"` EnableVoms bool `mapstructure:"enablevoms" yaml:"EnableVoms"` ExportLocation string `mapstructure:"exportlocation" yaml:"ExportLocation"` +<<<<<<< HEAD + FedTokenLocation string `mapstructure:"fedtokenlocation" yaml:"FedTokenLocation"` FilesBaseSize string `mapstructure:"filesbasesize" yaml:"FilesBaseSize"` FilesMaxSize string `mapstructure:"filesmaxsize" yaml:"FilesMaxSize"` FilesNominalSize string `mapstructure:"filesnominalsize" yaml:"FilesNominalSize"` +======= + FedTokenLocation string `mapstructure:"fedtokenlocation" yaml:"FedTokenLocation"` +>>>>>>> 35924841 (Implement routine for cache to fetch and store federation tokens) HighWaterMark string `mapstructure:"highwatermark" yaml:"HighWaterMark"` LocalRoot string `mapstructure:"localroot" yaml:"LocalRoot"` LowWatermark string `mapstructure:"lowwatermark" yaml:"LowWatermark"` @@ -221,6 +226,7 @@ type Config struct { ExportVolume string `mapstructure:"exportvolume" yaml:"ExportVolume"` ExportVolumes []string `mapstructure:"exportvolumes" yaml:"ExportVolumes"` Exports interface{} `mapstructure:"exports" yaml:"Exports"` + FedTokenLocation string `mapstructure:"fedtokenlocation" yaml:"FedTokenLocation"` FederationPrefix string `mapstructure:"federationprefix" yaml:"FederationPrefix"` GlobusClientIDFile string `mapstructure:"globusclientidfile" yaml:"GlobusClientIDFile"` GlobusClientSecretFile string `mapstructure:"globusclientsecretfile" yaml:"GlobusClientSecretFile"` @@ -369,6 +375,7 @@ type configWithType struct { EnableTLSClientAuth struct { Type string; Value bool } EnableVoms struct { Type string; Value bool } ExportLocation struct { Type string; Value string } + FedTokenLocation struct { Type string; Value string } FilesBaseSize struct { Type string; Value string } FilesMaxSize struct { Type string; Value string } FilesNominalSize struct { Type string; Value string } @@ -553,6 +560,7 @@ type configWithType struct { ExportVolume struct { Type string; Value string } ExportVolumes struct { Type string; Value []string } Exports struct { Type string; Value interface{} } + FedTokenLocation struct { Type string; Value string } FederationPrefix struct { Type string; Value string } GlobusClientIDFile struct { Type string; Value string } GlobusClientSecretFile struct { Type string; Value string } diff --git a/server_structs/xrootd_server.go b/server_structs/xrootd_server.go index f3c5175ae..d931d5dea 100644 --- a/server_structs/xrootd_server.go +++ b/server_structs/xrootd_server.go @@ -19,6 +19,7 @@ package server_structs import ( + "context" "strings" ) @@ -29,6 +30,8 @@ type ( GetNamespaceAds() []NamespaceAdV2 CreateAdvertisement(name string, serverUrl string, serverWebUrl string) (*OriginAdvertiseV2, error) GetNamespaceAdsFromDirector() error + GetAdTokCfg(context.Context) (AdTokCfg, error) + GetFedTokLocation() string // Return the PIDs corresponding to the running process(es) for the XRootD // server instance (could be multiple if there's both cmsd and xrootd) @@ -42,6 +45,12 @@ type ( namespaceAds []NamespaceAdV2 } + AdTokCfg struct { + Issuer string + Subject string + Audience string + } + ServerPrefix string // The base namespace prefix for origin/cache server ) diff --git a/server_utils/server_utils.go b/server_utils/server_utils.go index 06b40e61c..ebe1d9136 100644 --- a/server_utils/server_utils.go +++ b/server_utils/server_utils.go @@ -30,6 +30,9 @@ import ( "fmt" "io" "net/http" + "net/url" + "os" + "path/filepath" "reflect" "strings" "time" @@ -43,6 +46,8 @@ import ( "github.com/pelicanplatform/pelican/metrics" "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_structs" + "github.com/pelicanplatform/pelican/token" + "github.com/pelicanplatform/pelican/token_scopes" ) // GetTopologyJSON returns the namespaces and caches from OSDF topology @@ -321,3 +326,170 @@ func FilterTopLevelPrefixes(nsAds []server_structs.NamespaceAdV2) []server_struc } return uniquePrefixes } + +// Get an advertisement token for the given server. Advertisement tokens are signed by the server +// and passed to the Director, which can then use them to check the server's identity. Tokens are +// valid when the Director can query the public key for the given server from the Registry. +func GetAdvertisementTok(ctx context.Context, server server_structs.XRootDServer) (tok string, err error) { + tokCfg, err := server.GetAdTokCfg(ctx) + if err != nil { + err = errors.Wrap(err, "failed to get advertisement token configuration") + return + } + + advTokenCfg := token.NewWLCGToken() + advTokenCfg.Lifetime = time.Minute + advTokenCfg.Issuer = tokCfg.Issuer + advTokenCfg.AddAudiences(tokCfg.Audience) + // RFC 7519, Section 4.1.2 indicates the "sub" claim MUST be unique within its issuer scope, + // or better yet globally unique. Use the server's host:port to fulfill global uniqueness. + advTokenCfg.Subject = tokCfg.Subject + advTokenCfg.AddScopes(token_scopes.Pelican_Advertise) + + tok, err = advTokenCfg.CreateToken() + if err != nil { + err = errors.Wrap(err, "failed to create director advertisement token") + } + + return +} + +// GetFedTok retrieves a federation token from the Director, which can be passed to other +// federation services as proof of federation membership. +func GetFedTok(ctx context.Context, server server_structs.XRootDServer) (string, error) { + // Set up the request to the Director + fInfo, err := config.GetFederation(ctx) + if err != nil { + return "", errors.Wrap(err, "failed to get federation info") + } + directorUrl := fInfo.DirectorEndpoint + if directorUrl == "" { + return "", errors.New("unable to determine Director's URL") + } + directorEndpoint, err := url.JoinPath(directorUrl, "api", "v1.0", "director", "getFedToken") + if err != nil { + return "", errors.Wrap(err, "unable to join director url") + } + query, err := url.Parse(directorEndpoint) + if err != nil { + return "", errors.Wrap(err, "the configured Director URL appears malformed") + } + + host := param.Server_Hostname.GetString() + adTok, err := GetAdvertisementTok(ctx, server) + if err != nil { + return "", errors.Wrap(err, "failed to get advertisement token") + } + + // The fed token endpoint wants to know the host and server type, + // which it needs to verify the token + params := url.Values{} + params.Add("host", host) + params.Add("sType", server.GetServerType().String()) + query.RawQuery = params.Encode() + + req, err := http.NewRequestWithContext(ctx, "GET", query.String(), nil) + if err != nil { + return "", errors.Wrap(err, "failed to create the request to get a federation token") + } + req.Header.Set("Authorization", "Bearer "+adTok) + userAgent := "pelican-" + strings.ToLower(server.GetServerType().String()) + "/" + config.GetVersion() + req.Header.Set("User-Agent", userAgent) + + tr := config.GetTransport() + client := http.Client{Transport: tr} + + resp, err := client.Do(req) + if err != nil { + return "", errors.Wrap(err, "failed to start the request for director advertisement") + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", errors.Wrap(err, "failed to read the response body for director advertisement") + } + + if resp.StatusCode != http.StatusOK { + // Unmarshal the body as a simple api response + var apiResp server_structs.SimpleApiResp + if err := json.Unmarshal(body, &apiResp); err != nil { + return "", errors.Wrap(err, "failed to unmarshal error response from Director's federation token endpoint") + } + + return "", errors.New(apiResp.Msg) + } + + // Attempt to unmarshal the body into our token struct + var tokResponse server_structs.TokenResponse + if err := json.Unmarshal(body, &tokResponse); err != nil { + // Check for a simple api error response + var apiResp server_structs.SimpleApiResp + if err := json.Unmarshal(body, &apiResp); err == nil { + return "", errors.New(apiResp.Msg) + } + + return "", errors.Wrap(err, "failed to unmarshal the response body for director advertisement") + } + + return tokResponse.AccessToken, nil +} + +// SetFedTok does an atomic write of a federation token to the server's token location. +func SetFedTok(ctx context.Context, server server_structs.XRootDServer, tok string) error { + tokLoc := server.GetFedTokLocation() + if tokLoc == "" { + return errors.New("token location is empty") + } + + dir := filepath.Dir(tokLoc) + if err := os.MkdirAll(dir, 0755); err != nil { + if !os.IsExist(err) { + return errors.Wrap(err, "failed to create fed token directories") + } + } + + // Create a temporary file for storing the token. Later we'll do an atomic rename + tmpName := filepath.Join(dir, fmt.Sprintf(".fedtoken.%d", time.Now().UnixNano())) + tmpFile, err := os.OpenFile(tmpName, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600) + if err != nil { + return errors.Wrap(err, "failed to create temporary token file") + } + + defer func() { + tmpFile.Close() + os.Remove(tmpName) + }() + + // Change ownership to xrootd user + uid, err := config.GetDaemonUID() + if err != nil { + return errors.Wrap(err, "failed to get daemon UID") + } + gid, err := config.GetDaemonGID() + if err != nil { + return errors.Wrap(err, "failed to get daemon GID") + } + + if err := os.Chown(tmpName, uid, gid); err != nil { + return errors.Wrap(err, "failed to change token file ownership") + } + + if _, err := tmpFile.WriteString(tok); err != nil { + return errors.Wrap(err, "failed to write token to temporary file") + } + + if err := tmpFile.Sync(); err != nil { + return errors.Wrap(err, "failed to sync token file") + } + + if err := tmpFile.Close(); err != nil { + return errors.Wrap(err, "failed to close temporary token file") + } + + if err := os.Rename(tmpName, tokLoc); err != nil { + return errors.Wrap(err, "failed to move token file to final location") + } + + return nil +} diff --git a/server_utils/server_utils_test.go b/server_utils/server_utils_test.go index 1bfbe25f7..13fff21d3 100644 --- a/server_utils/server_utils_test.go +++ b/server_utils/server_utils_test.go @@ -24,6 +24,8 @@ import ( "fmt" "net/http" "net/http/httptest" + "os" + "path/filepath" "testing" "time" @@ -189,3 +191,86 @@ func TestFilterTopLevelPrefixes(t *testing.T) { assert.ElementsMatch(t, expectedPaths, filteredPaths) } + +// Mocked server to fulfill the XRootDServer interface in testing +type mockServer struct { + tokenLoc string + uid int + gid int + pids []int + serverType server_structs.ServerType + namespaceAds []server_structs.NamespaceAdV2 +} + +func (m *mockServer) GetServerType() server_structs.ServerType { return m.serverType } +func (m *mockServer) SetNamespaceAds(ads []server_structs.NamespaceAdV2) { m.namespaceAds = ads } +func (m *mockServer) GetNamespaceAds() []server_structs.NamespaceAdV2 { return m.namespaceAds } +func (m *mockServer) CreateAdvertisement(name, serverUrl, serverWebUrl string) (*server_structs.OriginAdvertiseV2, error) { + return nil, nil +} +func (m *mockServer) GetNamespaceAdsFromDirector() error { return nil } +func (m *mockServer) GetAdTokCfg(ctx context.Context) (server_structs.AdTokCfg, error) { + return server_structs.AdTokCfg{}, nil +} +func (m *mockServer) GetFedTokLocation() string { return m.tokenLoc } +func (m *mockServer) GetPids() []int { return m.pids } +func (m *mockServer) SetPids(pids []int) { m.pids = pids } + +func TestSetFedTok(t *testing.T) { + testCases := []struct { + name string + server *mockServer + token string + setupDir bool // Whether to create directory structure + expectErr bool + errMsg string + }{ + { + name: "Valid token write", + server: &mockServer{ + tokenLoc: filepath.Join(t.TempDir(), "tokens", "fed.token"), + uid: os.Getuid(), + gid: os.Getgid(), + }, + token: "test-token", + setupDir: true, + expectErr: false, + errMsg: "", + }, + { + name: "Empty token location", + server: &mockServer{ + tokenLoc: "", + }, + token: "test-token", + setupDir: false, + expectErr: true, + errMsg: "token location is empty", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.setupDir { + require.NoError(t, os.MkdirAll(filepath.Dir(tc.server.tokenLoc), 0755)) + } + + err := SetFedTok(context.Background(), tc.server, tc.token) + + if tc.expectErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.errMsg) + return + } + require.NoError(t, err) + + content, err := os.ReadFile(tc.server.tokenLoc) + require.NoError(t, err) + assert.Equal(t, tc.token, string(content)) + + info, err := os.Stat(tc.server.tokenLoc) + require.NoError(t, err) + assert.Equal(t, os.FileMode(0600), info.Mode().Perm()) + }) + } +} From dbbf5921f92670f26219b509d770a4e106600301 Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Wed, 12 Feb 2025 22:53:07 +0000 Subject: [PATCH 03/11] Adjust fed token maint ticker to fire based on discovered tok lifetime Previously I grabbed a default configuration from `Director.FedTokenLifetime`, but that bothered me because we can tell the token's actual lifetime just by looking at it. So that's what I do now. However, I'm punting on dynamic adjustment of this refresh cycle, which might be needed if the Director ever starts issuing shorter-lived tokens. --- cache/advertise.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/cache/advertise.go b/cache/advertise.go index dec8ace09..73482eac8 100644 --- a/cache/advertise.go +++ b/cache/advertise.go @@ -25,6 +25,7 @@ import ( "strings" "time" + "github.com/lestrrat-go/jwx/v2/jwt" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" @@ -222,15 +223,31 @@ func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server log.Errorf("Failed to get a federation token: %v", err) } + // We want to fire the ticker at 1/3 the period of the token lifetime, or 1/3 the default + // lifetime for the token if we can't otherwise determine it. In most cases, the two values + // will be the same unless some fed administrator thinks they know better! This 1/3 period approach + // gives us a bit of buffer in the event the Director is down for a short period of time. + // + // I think verificationless parsing is fine here, because we already assume a strong + // trust relationship with the Director, and if its been compromised, we have bigger problems. + var tickerRate time.Duration + if parsedTok, err := jwt.ParseInsecure([]byte(tok)); err != nil { + log.Errorf("Failed to parse a federation token from the Director: %v", err) + tickerRate = param.Director_FedTokenLifetime.GetDuration() + } else { + tickerRate = parsedTok.Expiration().Sub(parsedTok.IssuedAt()) + } + tickerRate /= 3 + // Set the token in the cache err = server_utils.SetFedTok(ctx, cache, tok) if err != nil { log.Errorf("Failed to set the federation token: %v", err) } - // Fire the ticker every at 1/3 the period of token lifetime. This gives us a bit - // of buffer in the event the Director is down for a short period of time. - fedTokTicker := time.NewTicker(param.Director_FedTokenLifetime.GetDuration() / 3) + // TODO: Figure out what to do if the Director starts issuing tokens with a different + // lifetime --> we can adjust ticker period dynamically, but what's the sensible thing to do? + fedTokTicker := time.NewTicker(tickerRate) egrp.Go(func() error { defer fedTokTicker.Stop() for { From 7888184693232b6d17e93672c4cd259163fe5140 Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Wed, 12 Feb 2025 23:12:33 +0000 Subject: [PATCH 04/11] Add additional fed token tests These use the new e2e fed test package so that I can spin up an entire federation while still importing individual components of packages that are also used by the federation. In particular, these tests probe Director/Registry/Cache interactions to make sure the whole chain works, as well as making sure the cache's fed token maintenance thread handles updating tokens based on their detected lifetimes. I'm punting on any dynamic adjustment of the period caches request this token, e.g. in the case the Director reboots with a much shorter fed token lifetime. --- cache/advertise.go | 3 + config/config.go | 7 ++ director/fed_token.go | 9 ++- e2e_fed_tests/cache_test.go | 80 ++++++++++++++++++++ e2e_fed_tests/director_test.go | 133 +++++++++++++++++++++++++++++++++ registry/registry_db.go | 4 +- 6 files changed, 234 insertions(+), 2 deletions(-) create mode 100644 e2e_fed_tests/cache_test.go diff --git a/cache/advertise.go b/cache/advertise.go index 73482eac8..b65e97210 100644 --- a/cache/advertise.go +++ b/cache/advertise.go @@ -238,6 +238,7 @@ func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server tickerRate = parsedTok.Expiration().Sub(parsedTok.IssuedAt()) } tickerRate /= 3 + log.Debugf("Federation token refresh rate set to %.3fm", tickerRate.Minutes()) // Set the token in the cache err = server_utils.SetFedTok(ctx, cache, tok) @@ -260,12 +261,14 @@ func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server log.Errorf("Failed to get a federation token: %v", err) continue } + log.Traceln("Successfully received new federation token") // Set the token in the cache err = server_utils.SetFedTok(ctx, cache, tok) if err != nil { log.Errorf("Failed to write the federation token: %v", err) } + log.Traceln("Successfully wrote new federation token to disk") case <-ctx.Done(): return nil } diff --git a/config/config.go b/config/config.go index aa42ab590..fce32ec53 100644 --- a/config/config.go +++ b/config/config.go @@ -360,6 +360,13 @@ func discoverFederationImpl(ctx context.Context) (fedInfo pelican_url.Federation fedInfo.BrokerEndpoint = externalUrlStr } + // Some services in the Director, like federation tokens, may require a defined federation root (discovery URL) + // so for these to have a chance at working in places where a Director truly is acting as the fed root (e.g. + // fed-in-box unit tests), we need to set the discovery endpoint to the director endpoint. + if fedInfo.DiscoveryEndpoint == "" && enabledServers.IsEnabled(server_structs.DirectorType) { + fedInfo.DiscoveryEndpoint = fedInfo.DirectorEndpoint + } + // Make sure any values in global federation metadata are url-parseable fedInfo.DirectorEndpoint = wrapWithHttpsIfNeeded(fedInfo.DirectorEndpoint) fedInfo.RegistryEndpoint = wrapWithHttpsIfNeeded(fedInfo.RegistryEndpoint) diff --git a/director/fed_token.go b/director/fed_token.go index 26abacc75..d3d3790fe 100644 --- a/director/fed_token.go +++ b/director/fed_token.go @@ -21,6 +21,7 @@ package director import ( "fmt" "net/http" + "strings" "github.com/gin-gonic/gin" "github.com/pkg/errors" @@ -115,12 +116,18 @@ func createFedTok(ginCtx *gin.Context, rInfo requestInfo) (tok string, err error hostPrefixes, exists := allowedPrefixes[rInfo.Host] if !exists { - // If there are no prefixes, we assume the cache is configured to read all namespaces + // If the entry does not exist, we assume there are no prefix restrictions hostPrefixes = map[string]struct{}{"/": {}} } scopes := make([]token_scopes.TokenScope, 0, len(allowedPrefixes[rInfo.Host])) for prefix := range hostPrefixes { + // Provide at least _some_ basic validation before we use this value to mint read tokens... + if !strings.HasPrefix(prefix, "/") { + log.Warningf("Invalid prefix from registry's 'AllowedPrefixes' custom field found while creating "+ + "federation token for host '%s': '%s'", rInfo.Host, prefix) + continue + } var readScope token_scopes.TokenScope readScope, err = token_scopes.Storage_Read.Path(prefix) if err != nil { diff --git a/e2e_fed_tests/cache_test.go b/e2e_fed_tests/cache_test.go new file mode 100644 index 000000000..267606b1b --- /dev/null +++ b/e2e_fed_tests/cache_test.go @@ -0,0 +1,80 @@ +//go:build !windows + +/*************************************************************** +* +* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research +* +* Licensed under the Apache License, Version 2.0 (the "License"); you +* may not use this file except in compliance with the License. You may +* obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +***************************************************************/ + +package fed_tests + +import ( + "context" + _ "embed" + "os" + "testing" + "time" + + _ "github.com/glebarez/sqlite" + "github.com/spf13/viper" + "github.com/stretchr/testify/require" + + "github.com/pelicanplatform/pelican/cache" + "github.com/pelicanplatform/pelican/fed_test_utils" + "github.com/pelicanplatform/pelican/param" + "github.com/pelicanplatform/pelican/server_utils" + "github.com/pelicanplatform/pelican/test_utils" +) + +// Test that token maintenance for the cache works as expected -- we never +// want to let the on-disk token expire. +func TestCacheFedTokMaint(t *testing.T) { + server_utils.ResetTestState() + defer server_utils.ResetTestState() + + // Spin up the full fed so that our cache server can get the token from the director + viper.Set(param.Director_FedTokenLifetime.GetName(), "12s") + _ = fed_test_utils.NewFedTest(t, bothPubNamespaces) + // Now unset this to prove the cache maint thread is using token lifetime, and not + // a value the maintenance thread gets from viper. + viper.Set(param.Director_FedTokenLifetime.GetName(), nil) + + // Run the token maintenance routine for two periods and make sure + // the cache token on disk is never older than 4s (1/3 the configured lifetime) + ctx := context.Background() + ctx, cancel, egrp := test_utils.TestContext(ctx, t) + defer cancel() + cacheServer := cache.CacheServer{} + cache.LaunchFedTokManager(ctx, egrp, &cacheServer) + tokFile := cacheServer.GetFedTokLocation() + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + timeout := time.After(24 * time.Second) + for { + select { + case <-ticker.C: + info, err := os.Stat(tokFile) + require.NoError(t, err, "Failed to stat token file") + age := time.Since(info.ModTime()) + if age > 4*time.Second { + t.Fatalf("Token file age exceeded 4s: %v", age) + } + case <-timeout: + return + } + } +} diff --git a/e2e_fed_tests/director_test.go b/e2e_fed_tests/director_test.go index e1d2a268c..7d6d77b28 100644 --- a/e2e_fed_tests/director_test.go +++ b/e2e_fed_tests/director_test.go @@ -28,13 +28,20 @@ import ( "io" "net/http" "net/url" + "strings" "testing" + "time" + _ "github.com/glebarez/sqlite" + "github.com/lestrrat-go/jwx/v2/jwt" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/pelicanplatform/pelican/cache" "github.com/pelicanplatform/pelican/config" "github.com/pelicanplatform/pelican/fed_test_utils" + "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_structs" "github.com/pelicanplatform/pelican/server_utils" @@ -122,3 +129,129 @@ func TestDirectorCacheHealthTest(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK, "Failed to get director test file from cache") assert.Contains(t, string(cacheBody), "This object was created by the Pelican director-test functionality") } + +func updateAllowedPrefixesForCache(t *testing.T, dbPath string, cacheHost string, allowedPrefixes []string) { + // We treat the absence of this custom field differently than its presence + // and an empty list. + if len(allowedPrefixes) == 0 { + return + } + + db, err := server_utils.InitSQLiteDB(dbPath) + require.NoError(t, err, "Failed to connect to registry database") + defer func() { + _ = server_utils.ShutdownDB(db) + }() + + var namespace server_structs.Namespace + result := db.Where("prefix = ?", "/caches/"+cacheHost).First(&namespace) + require.NoError(t, result.Error, "Failed to find namespace for host %s: %v", cacheHost, result.Error) + + if namespace.CustomFields == nil { + namespace.CustomFields = make(map[string]interface{}) + } + namespace.CustomFields["AllowedPrefixes"] = allowedPrefixes + + result = db.Model(&namespace).Updates(server_structs.Namespace{ + CustomFields: namespace.CustomFields, + }) + require.NoError(t, result.Error, "Failed to update namespace for host %s: %v", cacheHost, result.Error) + if result.RowsAffected == 0 { + require.Fail(t, "No rows affected when updating namespace for host %s", cacheHost) + } +} + +// Test that registered services can grab a token from the Director +// using a valid advertise token. For now this only tests Caches because +// we aren't actively using fed tokens in the Origin yet. +func TestDirectorFedTokenCacheAPI(t *testing.T) { + server_utils.ResetTestState() + defer server_utils.ResetTestState() + + testCases := []struct { + name string + allowedPrefixes []string + scopeShouldHave []string + scopeShouldNotHave []string + }{ + { + name: "AllowFirstNamespace", + allowedPrefixes: []string{"/first/namespace"}, + scopeShouldHave: []string{"storage.read:/first/namespace"}, + scopeShouldNotHave: []string{"/second/namespace"}, + }, + { + name: "AllowBothNamespaces", + allowedPrefixes: []string{"/first/namespace", "/second/namespace"}, + scopeShouldHave: []string{"storage.read:/first/namespace", "storage.read:/second/namespace"}, + scopeShouldNotHave: []string{}, + }, + { + name: "NoCustomField", + allowedPrefixes: []string{}, + scopeShouldHave: []string{"storage.read:/"}, // Absence of field means no namespace restrictions + scopeShouldNotHave: []string{}, + }, + { + name: "EmptyCustomField", + allowedPrefixes: []string{""}, + scopeShouldHave: []string{}, // Empty field means no read permissions + scopeShouldNotHave: []string{}, + }, + { + name: "GlobNamespace", + allowedPrefixes: []string{"*"}, + scopeShouldHave: []string{"storage.read:/"}, + scopeShouldNotHave: []string{}, + }, + // After some discussion with Sarthak, we decided there's no point in testing + // the case where the Registry is configured with an invalid namespace -- we + // make the assumption that namespace info is validated by the Registry before + // insertion in its database. + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + viper.Set(param.Director_RegistryQueryInterval.GetName(), "1s") + _ = fed_test_utils.NewFedTest(t, bothPubNamespaces) + + // All servers running as part of fed-in-a-box will have the same hostname + // so we can use that fact when injecting allowed prefixes into the registry database + host := param.Server_Hostname.GetString() + require.NotEmpty(t, host, "Failed to determine server hostname") + + // Inject our "AllowedPrefixes" data into the registry database under + // the /caches/ namespace + dbLoc := param.Registry_DbLocation.GetString() + require.NotEmpty(t, dbLoc, "Failed to determine registry database location") + updateAllowedPrefixesForCache(t, dbLoc, host, tc.allowedPrefixes) + + // Now sleep for 2 seconds so the Director has time to populate the changes + time.Sleep(2 * time.Second) + + // Grab the service's key and create an advertise token + ctx := context.Background() + ctx, _, _ = test_utils.TestContext(ctx, t) + cache := cache.CacheServer{} + tokStr, err := server_utils.GetFedTok(ctx, &cache) + require.NoError(t, err, "Failed to get cache's advertisement token") + require.NotEmpty(t, tokStr, "Got an empty token") + + tok, err := jwt.ParseInsecure([]byte(tokStr)) + require.NoError(t, err, "Failed to parse token") + // In this case, the "fed issuer" is the director because we're running as fed-in-a-box. + // However, that need not be true in general wherever the Director has a configured Federation.DiscoveryUrl. + fedInfo, err := config.GetFederation(ctx) + require.NoError(t, err, "Failed to get federation info") + directorUrlStr := fedInfo.DirectorEndpoint + assert.Equal(t, directorUrlStr, tok.Issuer()) + var scopes []string + if rawScopes, exists := tok.Get("scope"); exists { + if scopeStr, ok := rawScopes.(string); ok { + scopes = strings.Split(scopeStr, " ") + } + } + assert.ElementsMatch(t, tc.scopeShouldHave, scopes) + }) + } +} diff --git a/registry/registry_db.go b/registry/registry_db.go index 024f282cc..161914cc7 100644 --- a/registry/registry_db.go +++ b/registry/registry_db.go @@ -283,7 +283,9 @@ func getNamespaceByPrefix(prefix string) (*server_structs.Namespace, error) { // getAllowedPrefixesForCaches queries the database to create a map of cache // hostnames to a list of prefixes that each cache is allowed to serve. // If a cache hostname key is not present in the resultant map, it implies the -// default behavior where the cache is allowed to serve all prefixes. +// default behavior where the cache is allowed to serve all prefixes. However, +// if the cache hostname key is present with an empty list of prefixes, it implies +// the cache is not allowed to serve any prefixes. It is explicitly NOT treated like "*". func getAllowedPrefixesForCaches() (map[string][]string, error) { var namespaces []server_structs.Namespace From 0e8509369504cd4450c65fe84f4d5a9c382eec9d Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Thu, 13 Feb 2025 18:59:38 +0000 Subject: [PATCH 05/11] Add circuit breaker to fed token refresher and base period on token life --- cache/advertise.go | 74 +++++++++++++++++++++++++++++++------ e2e_fed_tests/cache_test.go | 14 +++++-- param/parameters_struct.go | 4 -- 3 files changed, 72 insertions(+), 20 deletions(-) diff --git a/cache/advertise.go b/cache/advertise.go index b65e97210..0c9ce49a5 100644 --- a/cache/advertise.go +++ b/cache/advertise.go @@ -45,6 +45,9 @@ type ( } ) +// Can use this mechanism to override the minimum for the sake of tests +var MinFedTokenTickerRate = 1 * time.Minute + func (server *CacheServer) CreateAdvertisement(name, originUrl, originWebUrl string) (*server_structs.OriginAdvertiseV2, error) { registryPrefix := server_structs.GetCacheNS(param.Xrootd_Sitename.GetString()) ad := server_structs.OriginAdvertiseV2{ @@ -216,6 +219,56 @@ func (server *CacheServer) GetFedTokLocation() string { return param.Cache_FedTokenLocation.GetString() } +// Given a token, calculate the lifetime of the token +func calcTokLifetime(tok string) (time.Duration, error) { + // I think verificationless parsing is fine here, because we already assume a strong + // trust relationship with the Director, and if its been compromised, we have bigger problems. + parsedTok, err := jwt.ParseInsecure([]byte(tok)) + if err != nil { + return 0, err + } + return parsedTok.Expiration().Sub(parsedTok.IssuedAt()), nil +} + +// validateTickerRate is the circuit breaker that prevents the ticker +// from firing too often. It also handles logging errors/warnings related +// to the token lifetime and the refresh rate. +func validateTickerRate(tickerRate time.Duration, tokLifetime time.Duration) time.Duration { + validated := tickerRate + + if validated < MinFedTokenTickerRate { + log.Warningf("Deduced federation token refresh period is less than minimum of %.3fm; setting to %.3fm", + MinFedTokenTickerRate.Minutes(), MinFedTokenTickerRate.Minutes()) + validated = MinFedTokenTickerRate + } + + // Unfortunately we can't do anything here about the Director sending + // such short lived tokens unless we're willing to forgo the circuit + // breaker. + if validated > tokLifetime { + log.Errorf("Deduced federation token refresh period exceeds token lifetime. Tokens will expire before refresh") + } + + log.Debugf("Federation token refresh rate set to %.3fm", validated.Minutes()) + + return validated +} + +// getTickerRate calculates the rate at which the federation token should be refreshed +// by looking at the token lifetime and setting the ticker to 1/3 of that lifetime. +// If the token lifetime cannot be determined, the ticker is set to 1/3 of the default with +// a minimum of 1 minute. +func getTickerRate(tok string) time.Duration { + var tickerRate time.Duration + tokenLifetime, err := calcTokLifetime(tok) + if err != nil { + tokenLifetime = param.Director_FedTokenLifetime.GetDuration() + log.Errorf("Failed to calculate lifetime of federation token: %v.", err) + } + tickerRate = tokenLifetime / 3 + return validateTickerRate(tickerRate, tokenLifetime) +} + func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server_structs.XRootDServer) { // Do our initial token fetch+set, then turn things over to the ticker tok, err := server_utils.GetFedTok(ctx, cache) @@ -227,18 +280,7 @@ func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server // lifetime for the token if we can't otherwise determine it. In most cases, the two values // will be the same unless some fed administrator thinks they know better! This 1/3 period approach // gives us a bit of buffer in the event the Director is down for a short period of time. - // - // I think verificationless parsing is fine here, because we already assume a strong - // trust relationship with the Director, and if its been compromised, we have bigger problems. - var tickerRate time.Duration - if parsedTok, err := jwt.ParseInsecure([]byte(tok)); err != nil { - log.Errorf("Failed to parse a federation token from the Director: %v", err) - tickerRate = param.Director_FedTokenLifetime.GetDuration() - } else { - tickerRate = parsedTok.Expiration().Sub(parsedTok.IssuedAt()) - } - tickerRate /= 3 - log.Debugf("Federation token refresh rate set to %.3fm", tickerRate.Minutes()) + tickerRate := getTickerRate(tok) // Set the token in the cache err = server_utils.SetFedTok(ctx, cache, tok) @@ -263,6 +305,14 @@ func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server } log.Traceln("Successfully received new federation token") + // Once again, parse the token, use it to set the next ticker fire + // while also building in a circuit breaker to set a min ticker rate + newTickerRate := getTickerRate(tok) + if newTickerRate != tickerRate { + fedTokTicker.Reset(newTickerRate) + tickerRate = newTickerRate + } + // Set the token in the cache err = server_utils.SetFedTok(ctx, cache, tok) if err != nil { diff --git a/e2e_fed_tests/cache_test.go b/e2e_fed_tests/cache_test.go index 267606b1b..7e4547f6b 100644 --- a/e2e_fed_tests/cache_test.go +++ b/e2e_fed_tests/cache_test.go @@ -24,6 +24,7 @@ import ( "context" _ "embed" "os" + "path/filepath" "testing" "time" @@ -46,10 +47,12 @@ func TestCacheFedTokMaint(t *testing.T) { // Spin up the full fed so that our cache server can get the token from the director viper.Set(param.Director_FedTokenLifetime.GetName(), "12s") + oldMinTokRate := cache.MinFedTokenTickerRate + defer func() { + cache.MinFedTokenTickerRate = oldMinTokRate + }() + cache.MinFedTokenTickerRate = 1 * time.Second _ = fed_test_utils.NewFedTest(t, bothPubNamespaces) - // Now unset this to prove the cache maint thread is using token lifetime, and not - // a value the maintenance thread gets from viper. - viper.Set(param.Director_FedTokenLifetime.GetName(), nil) // Run the token maintenance routine for two periods and make sure // the cache token on disk is never older than 4s (1/3 the configured lifetime) @@ -57,6 +60,9 @@ func TestCacheFedTokMaint(t *testing.T) { ctx, cancel, egrp := test_utils.TestContext(ctx, t) defer cancel() cacheServer := cache.CacheServer{} + + // Give this "cache" instance a unique location so it doesn't compete with the fed test cache token + viper.Set(param.Cache_FedTokenLocation.GetName(), filepath.Join(t.TempDir(), t.Name()+"_fedtok")) cache.LaunchFedTokManager(ctx, egrp, &cacheServer) tokFile := cacheServer.GetFedTokLocation() @@ -70,7 +76,7 @@ func TestCacheFedTokMaint(t *testing.T) { info, err := os.Stat(tokFile) require.NoError(t, err, "Failed to stat token file") age := time.Since(info.ModTime()) - if age > 4*time.Second { + if age > (4*time.Second + 500*time.Millisecond) { // build in a little slop t.Fatalf("Token file age exceeded 4s: %v", age) } case <-timeout: diff --git a/param/parameters_struct.go b/param/parameters_struct.go index 850d5c3d7..8c09c5eae 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -37,14 +37,10 @@ type Config struct { EnableTLSClientAuth bool `mapstructure:"enabletlsclientauth" yaml:"EnableTLSClientAuth"` EnableVoms bool `mapstructure:"enablevoms" yaml:"EnableVoms"` ExportLocation string `mapstructure:"exportlocation" yaml:"ExportLocation"` -<<<<<<< HEAD FedTokenLocation string `mapstructure:"fedtokenlocation" yaml:"FedTokenLocation"` FilesBaseSize string `mapstructure:"filesbasesize" yaml:"FilesBaseSize"` FilesMaxSize string `mapstructure:"filesmaxsize" yaml:"FilesMaxSize"` FilesNominalSize string `mapstructure:"filesnominalsize" yaml:"FilesNominalSize"` -======= - FedTokenLocation string `mapstructure:"fedtokenlocation" yaml:"FedTokenLocation"` ->>>>>>> 35924841 (Implement routine for cache to fetch and store federation tokens) HighWaterMark string `mapstructure:"highwatermark" yaml:"HighWaterMark"` LocalRoot string `mapstructure:"localroot" yaml:"LocalRoot"` LowWatermark string `mapstructure:"lowwatermark" yaml:"LowWatermark"` From 49eb90979b5cd5e7435ae3698fb350ede94c13bc Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Thu, 13 Feb 2025 19:33:51 +0000 Subject: [PATCH 06/11] Add fed token API to swagger docs --- swagger/pelican-swagger.yaml | 61 ++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/swagger/pelican-swagger.yaml b/swagger/pelican-swagger.yaml index 0f6633d05..4dee0617c 100644 --- a/swagger/pelican-swagger.yaml +++ b/swagger/pelican-swagger.yaml @@ -2019,6 +2019,67 @@ paths: schema: type: object $ref: "#/definitions/ErrorModelV2" + /director/getFedToken: + get: + summary: Get a token signed by the federation's issuer + description: Generates a federation token for a given host and server type. + parameters: + - name: host + in: query + required: true + schema: + type: string + description: The hostname of the server. + - name: sType + in: query + required: true + schema: + type: string + enum: [Cache, Origin] + description: The server type (Cache or Origin). + - name: authz + in: query + required: true + schema: + type: string + description: The authorization token, may also be passed via "Authorization" header. + responses: + '200': + description: Successfully generated federation token. + content: + application/json: + schema: + type: object + properties: + AccessToken: + type: string + description: The generated federation token. + '403': + description: Forbidden - validation error or unauthorized request. + content: + application/json: + schema: + type: object + properties: + Status: + type: string + example: RespFailed + Msg: + type: string + description: Error message. + '500': + description: Internal Server Error - unexpected error occurred. + content: + application/json: + schema: + type: object + properties: + Status: + type: string + example: RespFailed + Msg: + type: string + description: Error message. /origin_ui/exports: get: summary: Returns the data exports of the origin server From ee2e34bc0abf953d6ce2a901ddb6bf709ff993e6 Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Thu, 13 Feb 2025 19:54:39 +0000 Subject: [PATCH 07/11] Remove test whose assumption is no longer true --- director/fed_token_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/director/fed_token_test.go b/director/fed_token_test.go index d8bd7f44b..7441c0085 100644 --- a/director/fed_token_test.go +++ b/director/fed_token_test.go @@ -206,15 +206,6 @@ func TestCreateFedTok(t *testing.T) { }, expectErr: false, }, - { - name: "Malformed discovery config", - host: "test-cache.example.com", - sType: server_structs.CacheType, - discoveryUrl: "", - allowedPrefixes: map[string]map[string]struct{}{}, - expectErr: true, - errContains: "federation issuer is not set", - }, { name: "No allowed prefixes defaults to root of namespace", host: "test-cache.example.com", From 978fc4a9557756d685154306661e202348f45f4e Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Thu, 13 Feb 2025 20:09:34 +0000 Subject: [PATCH 08/11] Don't run server util tests on Windows --- server_utils/server_utils_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server_utils/server_utils_test.go b/server_utils/server_utils_test.go index 13fff21d3..bea90c7de 100644 --- a/server_utils/server_utils_test.go +++ b/server_utils/server_utils_test.go @@ -1,3 +1,5 @@ +//go:build !windows + /*************************************************************** * * Copyright (C) 2024, Pelican Project, Morgridge Institute for Research From 02fe7cb3b74249478c90eba8d731d94897e9bf09 Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Fri, 21 Feb 2025 14:56:33 +0000 Subject: [PATCH 09/11] Address review feedback --- cache/advertise.go | 4 ++-- director/fed_token.go | 10 +++++----- e2e_fed_tests/director_test.go | 2 +- server_utils/server_utils.go | 13 ++++++------- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/cache/advertise.go b/cache/advertise.go index 0c9ce49a5..71b4937b7 100644 --- a/cache/advertise.go +++ b/cache/advertise.go @@ -271,7 +271,7 @@ func getTickerRate(tok string) time.Duration { func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server_structs.XRootDServer) { // Do our initial token fetch+set, then turn things over to the ticker - tok, err := server_utils.GetFedTok(ctx, cache) + tok, err := server_utils.CreateFedTok(ctx, cache) if err != nil { log.Errorf("Failed to get a federation token: %v", err) } @@ -298,7 +298,7 @@ func LaunchFedTokManager(ctx context.Context, egrp *errgroup.Group, cache server case <-fedTokTicker.C: // Time to ask the Director for a new token log.Debugln("Refreshing federation token") - tok, err := server_utils.GetFedTok(ctx, cache) + tok, err := server_utils.CreateFedTok(ctx, cache) if err != nil { log.Errorf("Failed to get a federation token: %v", err) continue diff --git a/director/fed_token.go b/director/fed_token.go index d3d3790fe..2cde14e52 100644 --- a/director/fed_token.go +++ b/director/fed_token.go @@ -48,10 +48,10 @@ func validateFedTokRequest(ginCtx *gin.Context) (rInfo requestInfo, err error) { reqParams := getRequestParameters(ginCtx.Request) hNames, exists := reqParams["host"] if !exists || len(hNames) == 0 { - err = fmt.Errorf("no hostname found in the 'host' url parameter") + err = fmt.Errorf("no hostname found in the 'host' url parameter: %s", ginCtx.Request.URL.String()) return } else if len(hNames) > 1 { - err = fmt.Errorf("multiple hostnames found in the 'host' url parameter") + err = fmt.Errorf("multiple hostnames found in the 'host' url parameter: %s", ginCtx.Request.URL.String()) return } rInfo.Host = hNames[0] @@ -59,15 +59,15 @@ func validateFedTokRequest(ginCtx *gin.Context) (rInfo requestInfo, err error) { sTypes, exists := reqParams["sType"] var sType server_structs.ServerType if !exists || len(sTypes) == 0 { - err = fmt.Errorf("host '%s' generated request with no server type found in the 'sType' url parameter", rInfo.Host) + err = fmt.Errorf("host '%s' generated request with no server type found in the 'sType' url parameter: %s", rInfo.Host, ginCtx.Request.URL.String()) return } else if len(sTypes) > 1 { - err = fmt.Errorf("host '%s' generated request with multiple server types in the 'sType' url parameter", rInfo.Host) + err = fmt.Errorf("host '%s' generated request with multiple server types in the 'sType' url parameter: %s", rInfo.Host, ginCtx.Request.URL.String()) return } valid := sType.SetString(sTypes[0]) if !valid || (sType != server_structs.CacheType && sType != server_structs.OriginType) { - err = fmt.Errorf("host '%s' generated request with invalid server type '%s' as value of 'sType' url parameter", rInfo.Host, sTypes[0]) + err = fmt.Errorf("host '%s' generated request with invalid server type '%s' as value of 'sType' url parameter: %s", rInfo.Host, sTypes[0], ginCtx.Request.URL.String()) return } rInfo.SType = sType diff --git a/e2e_fed_tests/director_test.go b/e2e_fed_tests/director_test.go index 7d6d77b28..0764da923 100644 --- a/e2e_fed_tests/director_test.go +++ b/e2e_fed_tests/director_test.go @@ -233,7 +233,7 @@ func TestDirectorFedTokenCacheAPI(t *testing.T) { ctx := context.Background() ctx, _, _ = test_utils.TestContext(ctx, t) cache := cache.CacheServer{} - tokStr, err := server_utils.GetFedTok(ctx, &cache) + tokStr, err := server_utils.CreateFedTok(ctx, &cache) require.NoError(t, err, "Failed to get cache's advertisement token") require.NotEmpty(t, tokStr, "Got an empty token") diff --git a/server_utils/server_utils.go b/server_utils/server_utils.go index ebe1d9136..648606536 100644 --- a/server_utils/server_utils.go +++ b/server_utils/server_utils.go @@ -356,7 +356,7 @@ func GetAdvertisementTok(ctx context.Context, server server_structs.XRootDServer // GetFedTok retrieves a federation token from the Director, which can be passed to other // federation services as proof of federation membership. -func GetFedTok(ctx context.Context, server server_structs.XRootDServer) (string, error) { +func CreateFedTok(ctx context.Context, server server_structs.XRootDServer) (string, error) { // Set up the request to the Director fInfo, err := config.GetFederation(ctx) if err != nil { @@ -444,17 +444,16 @@ func SetFedTok(ctx context.Context, server server_structs.XRootDServer, tok stri dir := filepath.Dir(tokLoc) if err := os.MkdirAll(dir, 0755); err != nil { - if !os.IsExist(err) { - return errors.Wrap(err, "failed to create fed token directories") - } + return errors.Wrap(err, "failed to create fed token directories") } // Create a temporary file for storing the token. Later we'll do an atomic rename - tmpName := filepath.Join(dir, fmt.Sprintf(".fedtoken.%d", time.Now().UnixNano())) - tmpFile, err := os.OpenFile(tmpName, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600) + filenamePattern := fmt.Sprintf(".fedtoken.%d.*", time.Now().UnixNano()) + tmpFile, err := os.CreateTemp(dir, filenamePattern) if err != nil { return errors.Wrap(err, "failed to create temporary token file") } + tmpName := tmpFile.Name() defer func() { tmpFile.Close() @@ -472,7 +471,7 @@ func SetFedTok(ctx context.Context, server server_structs.XRootDServer, tok stri } if err := os.Chown(tmpName, uid, gid); err != nil { - return errors.Wrap(err, "failed to change token file ownership") + return errors.Wrapf(err, "failed to change token file ownership of %s to %d:%d", tmpName, uid, gid) } if _, err := tmpFile.WriteString(tok); err != nil { From 82e8ed9db84bfaf1967e5c466703ef61d8c700fc Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Fri, 21 Feb 2025 15:30:44 +0000 Subject: [PATCH 10/11] Add missing component tag to docs/parameters.yaml --- docs/parameters.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/parameters.yaml b/docs/parameters.yaml index a5902463d..d5ba19635 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -1565,6 +1565,7 @@ description: |+ a client certificate from the client. type: bool default: false +components: ["cache"] --- name: Cache.FedTokenLocation description: |+ From f2ca38d8669b8718ca85869ab99e90f8e602928e Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Fri, 21 Feb 2025 15:31:39 +0000 Subject: [PATCH 11/11] Remove erroneous apostrophe from docs --- docs/parameters.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/parameters.yaml b/docs/parameters.yaml index d5ba19635..da3b96e30 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -1570,7 +1570,7 @@ components: ["cache"] name: Cache.FedTokenLocation description: |+ A path to the file containing a token issued by the federation's issuer. This token may be consumed by other federation services - to prove the cache's membership in the federation. For example, Origin's serving a namespace without DirectReads enabled require + to prove the cache's membership in the federation. For example, Origins serving a namespace without DirectReads enabled require that all clients prove they come from within the federation. type: filename default: $ConfigBase/cache-fed-token