Skip to content

Commit

Permalink
Merge pull request #18 from bbockelm/origin_director_registration
Browse files Browse the repository at this point in the history
Have origin advertise to the director service
  • Loading branch information
bbockelm authored Aug 1, 2023
2 parents 696f00c + 4c5f0b6 commit f0f5677
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 9 deletions.
13 changes: 8 additions & 5 deletions cmd/director_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os/signal"
"syscall"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/director"
"github.com/pelicanplatform/pelican/web_ui"
log "github.com/sirupsen/logrus"
Expand All @@ -15,12 +16,14 @@ func serveDirector( /*cmd*/ *cobra.Command /*args*/, []string) error {
log.Info("Initializing Director GeoIP database...")
director.InitializeDB()

log.Info("Generating/advertising server ads...")
if config.GetPreferredPrefix() == "OSDF" {
log.Info("Generating/advertising server ads from OSG topology service...")

// Get the ads from topology, populate the cache, and keep the cache
// updated with fresh info
if err := director.AdvertiseOSDF(); err != nil {
panic(err)
// Get the ads from topology, populate the cache, and keep the cache
// updated with fresh info
if err := director.AdvertiseOSDF(); err != nil {
panic(err)
}
}
go director.PeriodicCacheReload()

Expand Down
8 changes: 8 additions & 0 deletions cmd/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,11 @@ func launchXrootd() error {
func serveOrigin(/*cmd*/ *cobra.Command, /*args*/ []string) error {
defer config.CleanupTempResources()

err := config.DiscoverFederation()
if err != nil {
log.Warningln("Failed to do service auto-discovery:", err)
}

monitorPort, err := pelican.ConfigureMonitoring()
if err != nil {
return err
Expand All @@ -517,6 +522,9 @@ func serveOrigin(/*cmd*/ *cobra.Command, /*args*/ []string) error {
if err = origin_ui.ConfigureOriginUI(engine); err != nil {
return err
}
if err = origin_ui.PeriodicAdvertiseOrigin(); err != nil {
return err
}

go web_ui.RunEngine(engine)

Expand Down
16 changes: 14 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,17 @@ func GetAllPrefixes() []string {
func DiscoverFederation() error {
federationStr := viper.GetString("FederationURL")
if len(federationStr) == 0 {
log.Debugln("Federation URL is unset; skipping discovery")
return nil
}
log.Debugln("Federation URL:", federationStr)
curDirectorURL := viper.GetString("DirectorURL")
if len(curDirectorURL) != 0 {
curNamespaceURL := viper.GetString("DirectorURL")
if len(curDirectorURL) != 0 && len(curNamespaceURL) != 0 {
return nil
}

log.Debugln("Performing federation service discovery against endpoint", federationStr)
federationUrl, err := url.Parse(federationStr)
if err != nil {
return errors.Wrapf(err, "Invalid federation value %s:", federationStr)
Expand Down Expand Up @@ -163,7 +167,15 @@ func DiscoverFederation() error {
if err != nil {
return errors.Wrapf(err, "Failure when parsing federation metadata at %s", discoveryUrl)
}
viper.Set("DirectorURL", metadata.DirectorEndpoint)
if curDirectorURL == "" {
log.Debugln("Federation service discovery resulted in director URL", metadata.DirectorEndpoint)
viper.Set("DirectorURL", metadata.DirectorEndpoint)
}
if curNamespaceURL == "" {
log.Debugln("Federation service discovery resulted in namespace registration URL",
metadata.NamespaceRegistrationEndpoint)
viper.Set("NamespaceURL", metadata.NamespaceRegistrationEndpoint)
}

return nil
}
Expand Down
1 change: 1 addition & 0 deletions config/resources/osdf.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
ManagerHost: redirector.osgstorage.org
SummaryMonitoringHost: xrd-report.osgstorage.org
DetailedMonitoringHost: xrd-mon.osgstorage.org
TopologyNamespaceURL: https://topology.opensciencegrid.org/stashcache/namespaces.json
4 changes: 2 additions & 2 deletions director/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ type (

// Populate internal cache with origin/cache ads
func AdvertiseOSDF() error {
namespaceURL := viper.GetString("NamespaceURL")
namespaceURL := viper.GetString("TopologyNamespaceURL")
if namespaceURL == "" {
return errors.New("NamespaceURL configuration option not set")
return errors.New("Topology namespaces.json configuration option (`TopologyNamespaceURL`) not set")
}

req, err := http.NewRequest("GET", namespaceURL, nil)
Expand Down
134 changes: 134 additions & 0 deletions director/origin_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package director

import (
"context"
"errors"
"net/url"
"path"
"strings"
"sync"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/lestrrat-go/jwx/jwa"
"github.com/lestrrat-go/jwx/jwk"
"github.com/lestrrat-go/jwx/jwt"
"github.com/pelicanplatform/pelican/config"
"github.com/spf13/viper"
)

type (

OriginAdvertise struct {
Name string `json:"name"`
URL string `json:"url"`
Namespaces []NamespaceAd `json:"namespaces"`
}

)

var (
namespaceKeys = ttlcache.New[string, *jwk.AutoRefresh](ttlcache.WithTTL[string, *jwk.AutoRefresh](15 * time.Minute))
namespaceKeysMutex = sync.RWMutex{}
)


func CreateAdvertiseToken(namespace string) (string, error) {
key, err := config.GetOriginJWK()
if err != nil {
return "", err
}
issuer_url, err := GetIssuerURL(namespace)
if err != nil {
return "", err
}
director := viper.GetString("DirectorURL")
if director == "" {
return "", errors.New("Director URL is not known; cannot create advertise token")
}

tok, err := jwt.NewBuilder().
Claim("scope", "pelican.advertise").
Issuer(issuer_url).
Audience([]string{director}).
Subject("origin").
Expiration(time.Now().Add(time.Minute)).
Build()
if err != nil {
return "", err
}

signed, err := jwt.Sign(tok, jwa.ES512, key)
if err != nil {
return "", err
}
return string(signed), nil
}

//
// Given a token and a location in the namespace to advertise in,
// see if the entity is authorized to advertise an origin for the
// namespace
func VerifyAdvertiseToken(token, namespace string) (bool, error) {
issuer_url, err := GetIssuerURL(namespace)
if err != nil {
return false, err
}
var ar *jwk.AutoRefresh
{
namespaceKeysMutex.RLock()
defer namespaceKeysMutex.Unlock()
item := namespaceKeys.Get(namespace)
if !item.IsExpired() {
ar = item.Value()
}
}
ctx := context.Background()
if ar == nil {
ar := jwk.NewAutoRefresh(ctx)
ar.Configure(issuer_url, jwk.WithMinRefreshInterval(15 * time.Minute))
namespaceKeysMutex.Lock()
defer namespaceKeysMutex.Unlock()
namespaceKeys.Set(namespace, ar, ttlcache.DefaultTTL)
}
keyset, err := ar.Fetch(ctx, issuer_url)
if err != nil {
return false, err
}

tok, err := jwt.Parse([]byte(token), jwt.WithKeySet(keyset), jwt.WithValidate(true))
if err != nil {
return false, err
}

scope_any, present := tok.Get("scope")
if !present {
return false, errors.New("No scope is present; required to advertise to director")
}
scope, ok := scope_any.(string)
if !ok {
return false, errors.New("scope claim in token is not string-valued")
}

scopes := strings.Split(scope, " ")

for _, scope := range(scopes) {
if scope == "pelican.advertise" {
return true, nil
}
}
return false, nil
}

func GetIssuerURL(prefix string) (string, error) {
namespace_url_string := viper.GetString("NamespaceURL")
if namespace_url_string == "" {
return "", errors.New("Namespace URL is not set")
}
namespace_url, err := url.Parse(namespace_url_string)
if err != nil {
return "", err
}
namespace_url.Path = path.Join(namespace_url.Path, "namespaces", prefix)
return namespace_url.String(), nil
}
46 changes: 46 additions & 0 deletions director/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)

func getRedirectURL(reqPath string, ad ServerAd, requiresAuth bool) (redirectURL url.URL) {
Expand Down Expand Up @@ -154,8 +155,53 @@ func ShortcutMiddleware() gin.HandlerFunc {
}
}

func RegisterOrigin (ctx *gin.Context) {
tokens, present := ctx.Request.Header["Authorization"]
if !present || len(tokens) == 0 {
ctx.JSON(401, gin.H{"error": "Bearer token not present in the 'Authorization' header"})
return
}
ad := OriginAdvertise{}
if ctx.ShouldBind(&ad) != nil {
ctx.JSON(400, gin.H{"error": "Invalid origin registration"})
return
}

for _, namespace := range(ad.Namespaces) {
ok, err := VerifyAdvertiseToken(tokens[0], namespace.Path)
if err != nil {
log.Warningln("Failed to verify token:", err)
ctx.JSON(400, gin.H{"error": "Authorization token verification failed"})
return
}
if !ok {
log.Warningf("Origin %v advertised to namespace %v without valid registration\n",
ad.Name, namespace.Path)
ctx.JSON(400, gin.H{"error": "Origin not authorized to advertise to this namespace"})
return
}
}

ad_url, err := url.Parse(ad.URL)
if err != nil {
log.Warningf("Failed to parse origin URL %v: %v\n", ad.URL, err)
ctx.JSON(400, gin.H{"error": "Invalid origin URL"})
return
}

originAd := ServerAd{
Name: ad.Name,
AuthURL: *ad_url,
URL: *ad_url,
Type: OriginType,
}
RecordAd(originAd, &ad.Namespaces)
ctx.JSON(200, gin.H{"msg": "Successful registration"})
}

func RegisterDirector(router *gin.RouterGroup) {
// Establish the routes used for cache/origin redirection
router.GET("/api/v1.0/director/object/*any", RedirectToCache)
router.GET("/api/v1.0/director/origin/*any", RedirectToOrigin)
router.POST("/api/v1.0/director/registerOrigin", RegisterOrigin)
}
91 changes: 91 additions & 0 deletions origin_ui/advertise.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package origin_ui

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"github.com/pelicanplatform/pelican/director"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

func PeriodicAdvertiseOrigin() error {
ticker := time.NewTicker(1 * time.Minute)
go func() {
err := AdvertiseOrigin()
if err != nil {
log.Warningln("Origin advertise failed:", err)
}
for {
<-ticker.C
err := AdvertiseOrigin()
if err != nil {
log.Warningln("Origin advertise failed:", err)
}
}
}()

return nil
}

func AdvertiseOrigin() error {
name := viper.GetString("Sitename")
if name == "" {
return errors.New("Origin name isn't set")
}
// TODO: waiting on a different branch to merge origin URL generation
originUrl := "https://localhost:8444"

ad := director.OriginAdvertise{
Name: name,
URL: originUrl,
Namespaces: make([]director.NamespaceAd, 0),
}

body, err := json.Marshal(ad)
if err != nil {
return errors.Wrap(err, "Failed to generate JSON description of origin")
}

directorUrlStr := viper.GetString("DirectorURL")
if directorUrlStr == "" {
return errors.New("Director endpoint URL is not known")
}
directorUrl, err := url.Parse(directorUrlStr)
if err != nil {
return errors.Wrap(err, "Failed to parse DirectorURL")
}
directorUrl.Path = "/api/v1.0/director/registerOrigin"

req, err := http.NewRequest("POST", directorUrl.String(), bytes.NewBuffer(body))
if err != nil {
return errors.Wrap(err, "Failed to create POST request for director registration")
}

req.Header.Set("Content-Type", "application/json")

client := http.Client{}
if viper.GetBool("TLSSkipVerify") {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client = http.Client{Transport: tr}
}
resp, err := client.Do(req)
if err != nil {
return errors.Wrap(err, "Failed to start request for director registration")
}
defer resp.Body.Close()

if resp.StatusCode > 299 {
return fmt.Errorf("Error response %v from director registration: %v", resp.StatusCode, resp.Status)
}

return nil
}

0 comments on commit f0f5677

Please sign in to comment.