Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#1129 from haoming29/origin-reg-status
Browse files Browse the repository at this point in the history
Add support for origin registration completeness status
  • Loading branch information
haoming29 authored May 3, 2024
2 parents 81427ca + 6ced7a1 commit f317b5a
Show file tree
Hide file tree
Showing 31 changed files with 1,329 additions and 107 deletions.
44 changes: 44 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import (
"sync"
"time"

"github.com/go-playground/locales/en"
ut "github.com/go-playground/universal-translator"
en_translations "github.com/go-playground/validator/v10/translations/en"

"github.com/go-playground/validator/v10"
"github.com/pelicanplatform/pelican/param"
"github.com/pkg/errors"
Expand Down Expand Up @@ -159,6 +163,14 @@ var (
// Global struct validator
validate *validator.Validate

// Global translator for the validator
uni *ut.UniversalTranslator

onceValidate sync.Once

// English translator
translator *ut.Translator

// A variable indicating enabled Pelican servers in the current process
enabledServers ServerType
setServerOnce sync.Once
Expand Down Expand Up @@ -213,6 +225,12 @@ func (e *MetadataErr) Unwrap() error {
}

func init() {
en := en.New()
uni = ut.New(en, en)

trans, _ := uni.GetTranslator("en")
translator = &trans

validate = validator.New(validator.WithRequiredStructEnabled())
}

Expand Down Expand Up @@ -746,6 +764,10 @@ func GetValidate() *validator.Validate {
return validate
}

func GetEnTranslator() ut.Translator {
return *translator
}

func handleDeprecatedConfig() {
deprecatedMap := param.GetDeprecated()
for deprecated, replacement := range deprecatedMap {
Expand Down Expand Up @@ -811,6 +833,20 @@ func checkWatermark(wmStr string) (bool, int64, error) {
}
}

func setupTranslation() error {
err := en_translations.RegisterDefaultTranslations(validate, GetEnTranslator())
if err != nil {
return err
}

return validate.RegisterTranslation("required", GetEnTranslator(), func(ut ut.Translator) error {
return ut.Add("required", "{0} is required.", true)
}, func(ut ut.Translator, fe validator.FieldError) string {
t, _ := ut.T("required", fe.Field())
return t
})
}

func InitConfig() {
viper.SetConfigType("yaml")
// 1) Set up defaults.yaml
Expand Down Expand Up @@ -897,6 +933,14 @@ func InitConfig() {
os.Exit(1)
}
handleDeprecatedConfig()

onceValidate.Do(func() {
err = setupTranslation()
})
if err != nil {
log.Errorln("Failed to set up translation for the validator: ", err.Error())
os.Exit(1)
}
}

func initConfigDir() error {
Expand Down
3 changes: 1 addition & 2 deletions director/director_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ func checkFilter(serverName string) (bool, filterType) {

// Configure TTL caches to enable cache eviction and other additional cache events handling logic
//
// The `ctx` is the context for listening to server shutdown event in order to cleanup internal cache eviction
// goroutine and `wg` is the wait group to notify when the clean up goroutine finishes
// The `ctx` is the context for listening to server shutdown event in order to cleanup internal cache eviction goroutine
func ConfigTTLCache(ctx context.Context, egrp *errgroup.Group) {
// Start automatic expired item deletion
go serverAds.Start()
Expand Down
9 changes: 9 additions & 0 deletions docs/scopes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ issuedBy: ["*"]
acceptedBy: ["*"]
---
############################
# Registry Scopes #
############################
name: registry.edit_registration
description: >-
For origin admin to edit namespace registration at the registry
issuedBy: ["origin"]
acceptedBy: ["registry"]
---
############################
# Monitoring Scopes #
############################
name: monitoring.scrape
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ require (
github.com/go-openapi/strfmt v0.21.7 // indirect
github.com/go-openapi/swag v0.22.4 // indirect
github.com/go-openapi/validate v0.22.1 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/locales v0.14.1
github.com/go-playground/universal-translator v0.18.1
github.com/go-playground/validator/v10 v10.16.0
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand Down
13 changes: 12 additions & 1 deletion launcher_utils/register_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ import (
"net/url"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/lestrrat-go/jwx/v2/jwk"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/lestrrat-go/jwx/v2/jwk"
"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/metrics"
"github.com/pelicanplatform/pelican/origin"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/registry"
"github.com/pelicanplatform/pelican/server_structs"
Expand Down Expand Up @@ -262,6 +264,7 @@ func registerNamespaceImpl(key jwk.Key, prefix string, registrationEndpointURL s
return nil
}

// Register the namespace. If failed, retry every 10s (default)
func RegisterNamespaceWithRetry(ctx context.Context, egrp *errgroup.Group, prefix string) error {
retryInterval := param.Server_RegistrationRetryInterval.GetDuration()
if retryInterval == 0 {
Expand All @@ -276,20 +279,28 @@ func RegisterNamespaceWithRetry(ctx context.Context, egrp *errgroup.Group, prefi
if isRegistered {
metrics.SetComponentHealthStatus(metrics.OriginCache_Registry, metrics.StatusOK, "")
log.Debugf("Origin already has prefix %v registered\n", prefix)
if err := origin.FetchAndSetRegStatus(prefix); err != nil {
return errors.Wrapf(err, "failed to fetch registration status for the prefix %s", prefix)
}
return nil
}

if err = registerNamespaceImpl(key, prefix, url); err == nil {
return nil
}
log.Errorf("Failed to register with namespace service: %v; will automatically retry in 10 seconds\n", err)
// For failed registration, set the status to RegError without a TTL
origin.SetNamespacesStatus(prefix, origin.RegistrationStatus{Status: origin.RegError}, ttlcache.NoTTL)

egrp.Go(func() error {
ticker := time.NewTicker(retryInterval)
for {
select {
case <-ticker.C:
if err := registerNamespaceImpl(key, prefix, url); err == nil {
if err := origin.FetchAndSetRegStatus(prefix); err != nil {
log.Errorf("failed to fetch registration status for the prefix %s: %v", prefix, err)
}
return nil
}
log.Errorf("Failed to register with namespace service: %v; will automatically retry in 10 seconds\n", err)
Expand Down
2 changes: 2 additions & 0 deletions launchers/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func OriginServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group,
// Set up the APIs for the origin UI
origin.RegisterOriginWebAPI(engine)

origin.ConfigOriginTTLCache(ctx, egrp)

// Director also registers this metadata URL; avoid registering twice.
if !modules.IsEnabled(config.DirectorType) {
server_utils.RegisterOIDCAPI(engine)
Expand Down
15 changes: 15 additions & 0 deletions origin/origin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
Expand Down Expand Up @@ -115,3 +117,16 @@ func LaunchOriginFileTestMaintenance(ctx context.Context) {
},
)
}

func ConfigOriginTTLCache(ctx context.Context, egrp *errgroup.Group) {
go registrationsStatus.Start()

egrp.Go(func() error {
<-ctx.Done()
log.Info("Gracefully stopping origin TTL cache eviction...")
registrationsStatus.DeleteAll()
registrationsStatus.Stop()
log.Info("Origin TTL cache eviction has been stopped")
return nil
})
}
71 changes: 64 additions & 7 deletions origin/origin_ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,86 @@ package origin

import (
"net/http"
"net/url"
"time"

"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/token"
"github.com/pelicanplatform/pelican/token_scopes"
"github.com/pelicanplatform/pelican/web_ui"
"github.com/sirupsen/logrus"
)

type exportsRes struct {
Type string `json:"type"` // either "posix" or "s3"
Exports []server_utils.OriginExport `json:"exports"`
}
type (
exportsRes struct {
Type string `json:"type"` // either "posix" or "s3"
Exports []exportWithStatus `json:"exports"`
}
)

func handleExports(ctx *gin.Context) {
storageType := param.Origin_StorageType.GetString()
exports, err := server_utils.GetOriginExports()
if err != nil {
logrus.Errorf("Failed to get the origin exports: %v", err)
log.Errorf("Failed to get the origin exports: %v", err)
ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{Status: server_structs.RespFailed, Msg: "Server encountered error when getting the origin exports: " + err.Error()})
return
}
ctx.JSON(http.StatusOK, exportsRes{Type: storageType, Exports: exports})
wrappedExports, err := wrapExportsByStatus(exports)
if err != nil {
log.Errorf("Failed to get the registration status of the exported prefixes: %v", err)
ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{Status: server_structs.RespFailed, Msg: "Server encountered error when getting the registration status for the exported prefixes: " + err.Error()})
return
}
// Create token for accessing registry edit page
issuerUrl, err := config.GetServerIssuerURL()
if err != nil {
log.Errorf("Failed to get server issuer url %v", err)
ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{Status: server_structs.RespFailed, Msg: "Server encountered error when getting server issuer url " + err.Error()})
return
}
fed, err := config.GetFederation(ctx)
if err != nil {
log.Error("handleExports: failed to get federaion:", err)
ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "Server error when getting federation information: " + err.Error(),
})
}
tc := token.NewWLCGToken()
tc.Issuer = issuerUrl
tc.Lifetime = 15 * time.Minute
tc.Subject = issuerUrl
tc.AddScopes(token_scopes.Registry_EditRegistration)
tc.AddAudiences(fed.NamespaceRegistrationEndpoint)
token, err := tc.CreateToken()
if err != nil {
log.Errorf("Failed to create access token for editing registration %v", err)
ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{Status: server_structs.RespFailed, Msg: "Server encountered error when creating token for access registry edit page " + err.Error()})
return
}

for idx, export := range wrappedExports {
if export.EditUrl != "" {
parsed, err := url.Parse(export.EditUrl)
if err != nil {
// current editUrl ends with "/?id=<x>"
wrappedExports[idx].EditUrl += "&access_token=" + token
continue
}
exQuery := parsed.Query()
exQuery.Add("access_token", token)
parsed.RawQuery = exQuery.Encode()
wrappedExports[idx].EditUrl = parsed.String()
}
}

ctx.JSON(http.StatusOK, exportsRes{Type: storageType, Exports: wrappedExports})
}

func RegisterOriginWebAPI(engine *gin.Engine) {
Expand Down
Loading

0 comments on commit f317b5a

Please sign in to comment.