Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Director capability into the Pelican CLI #7

Merged
merged 4 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions cmd/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func init() {
viper.SetDefault("TLSCertificate", "/etc/pelican/certificates/tls.crt")
viper.SetDefault("TLSKey", "/etc/pelican/certificates/tls.key")
viper.SetDefault("XrootdRun", "/run/pelican/xrootd")
viper.SetDefault("GeoIPLocation", "/run/pelican/geoip/GeoIP2.mmdb")
viper.SetDefault("MaxMindKeyFile", "/run/pelican/maxmind/maxmind.key")
viper.SetDefault("RobotsTxtFile", "/etc/pelican/robots.txt")
viper.SetDefault("ScitokensConfig", "/etc/pelican/xrootd/scitokens.cfg")
viper.SetDefault("Authfile", "/etc/pelican/xrootd/authfile")
Expand All @@ -109,20 +111,28 @@ func init() {
viper.SetDefault("Authfile", filepath.Join(configBase, "xrootd", "authfile"))
viper.SetDefault("MacaroonsKeyFile", filepath.Join(configBase, "macaroons-secret"))
viper.SetDefault("IssuerKey", filepath.Join(configBase, "issuer.jwk"))
viper.SetDefault("MaxMindKeyFile", filepath.Join(configBase, "maxmind.key"))

var runtimeDir string
if userRuntimeDir := os.Getenv("XDG_RUNTIME_DIR"); userRuntimeDir != "" {
runtimeDir := filepath.Join(userRuntimeDir, "pelican")
err := os.MkdirAll(runtimeDir, 0750)
if err != nil {
cobra.CheckErr(err)
}
viper.SetDefault("XrootdRun", runtimeDir)
runtimeDir = filepath.Join(userRuntimeDir, "pelican")
} else {
dir, err := os.MkdirTemp("", "pelican-xrootd-*")
runtimeDir, err = os.MkdirTemp("", "pelican-xrootd-*")
cobra.CheckErr(err)
cleanupDirOnShutdown(runtimeDir)
}
xrootdRuntimeDir := filepath.Join(runtimeDir, "xrootd")
if err = os.MkdirAll(xrootdRuntimeDir, 0750); err != nil {
cobra.CheckErr(err)
viper.SetDefault("XrootdRun", dir)
cleanupDirOnShutdown(dir)
}
viper.SetDefault("XrootdRun", xrootdRuntimeDir)

geoipRuntimeDir := filepath.Join(runtimeDir, "geoip")
if err = os.MkdirAll(geoipRuntimeDir, 0750); err != nil {
cobra.CheckErr(err)
}
viper.SetDefault("GeoIPLocation", filepath.Join(geoipRuntimeDir, "GeoIP2.mmdb"))

viper.SetDefault("XrootdMultiuser", false)
}
viper.SetDefault("TLSCertFile", "/etc/pki/tls/cert.pem")
Expand Down
122 changes: 122 additions & 0 deletions director/advertise.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package director

import (
"encoding/json"
"errors"
"io"
"net/http"
"net/url"

log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

type (
Cache struct {
AuthEndpoint string `json:"auth_endpoint"`
Endpoint string `json:"endpoint"`
Resource string `json:"resource"`
}

CredentialGeneration struct {
BasePath string `json:"base_path"`
Issuer string `json:"issuer"`
MaxScopeDepth int `json:"max_scope_depth"`
Strategy string `json:"strategy"`
VaultIssuer string `json:"vault_issuer"`
VaultServer string `json:"vault_server"`
}

Namespace struct {
Caches []Cache `json:"caches"`
CredentialGeneration CredentialGeneration `json:"credential_generation"`
DirlistHost string `json:"dirlisthost"`
Path string `json:"path"`
ReadHTTPS bool `json:"readhttps"`
UseTokenOnRead bool `json:"usetokenonread"`
WritebackHost string `json:"writebackhost"`
}

NamespaceJSON struct {
Caches []Cache `json:"caches"`
Namespaces []Namespace `json:"namespaces"`
}
)

func AdvertiseOSDF() error {
namespaceURL := viper.GetString("NamespaceURL")
if namespaceURL == "" {
return errors.New("NamespaceURL configuration option not set")
}

req, err := http.NewRequest("GET", namespaceURL, nil)
if err != nil {
return err
}

req.Header.Set("Accept", "application/json")

client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}

defer resp.Body.Close()

respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

var namespaces NamespaceJSON
if err = json.Unmarshal(respBytes, &namespaces); err != nil {
return err
}

cacheAdMap := make(map[ServerAd][]NamespaceAd)

for _, ns := range namespaces.Namespaces {
originAd := ServerAd{}
originNameStr := ns.WritebackHost
originURL, err := url.Parse(originNameStr)
if err != nil {
return err
}
originAd.Name = originURL.Host
originAd.URL = *originURL
originAd.Type = OriginType

originNS := NamespaceAd{}
originNS.RequireToken = ns.UseTokenOnRead
originNS.Path = ns.Path
issuerURL, err := url.Parse(ns.CredentialGeneration.Issuer)
if err != nil {
return err
}
originNS.Issuer = *issuerURL
originNS.MaxScopeDepth = uint(ns.CredentialGeneration.MaxScopeDepth)
originNS.Strategy = StrategyType(ns.CredentialGeneration.Strategy)
originNS.BasePath = ns.CredentialGeneration.BasePath
originNS.VaultServer = ns.CredentialGeneration.VaultServer

RecordAd(originAd, &[]NamespaceAd{originNS})

for _, cache := range ns.Caches {
cacheAd := ServerAd{}
cacheAd.Type = CacheType
cacheAd.Name = cache.Resource
cacheURL, err := url.Parse(cache.AuthEndpoint)
if err != nil {
log.Warningf("Namespace JSON returned cache %s with invalid URL %s",
cache.Resource, cache.AuthEndpoint)
}
cacheAd.URL = *cacheURL
cacheNS := NamespaceAd{}
cacheNS.Path = ns.Path
cacheNS.RequireToken = ns.UseTokenOnRead
cacheAdMap[cacheAd] = append(cacheAdMap[cacheAd], cacheNS)
}
}
return nil
}
126 changes: 126 additions & 0 deletions director/cache_ads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package director

import (
"errors"
"fmt"
"net"
"net/netip"
"net/url"
"path"
"strings"
"sync"
"time"

"github.com/jellydator/ttlcache/v3"
log "github.com/sirupsen/logrus"
)

type (
NamespaceAd struct {
RequireToken bool
Path string
Issuer url.URL
MaxScopeDepth uint
Strategy StrategyType
BasePath string
VaultServer string
}

ServerAd struct {
Name string
URL url.URL
Type ServerType
Latitude float64
Longitude float64
}

ServerType string
StrategyType string
)

const (
CacheType ServerType = "Cache"
OriginType ServerType = "Origin"
)

const (
OAuthStrategy StrategyType = "OAuth2"
VaultStrategy StrategyType = "Vault"
)


var (
serverAds = ttlcache.New[ServerAd, []NamespaceAd](ttlcache.WithTTL[ServerAd, []NamespaceAd](15 * time.Minute))
serverAdMutex = sync.RWMutex{}
)

func RecordAd(ad ServerAd, namespaceAds *[]NamespaceAd) {
if err := UpdateLatLong(&ad); err != nil {
log.Debugln("Failed to lookup GeoIP coordinates for host", ad.URL.Host)
}
serverAdMutex.Lock()
defer serverAdMutex.Unlock()

serverAds.Set(ad, *namespaceAds, ttlcache.DefaultTTL)
}

func UpdateLatLong(ad *ServerAd) error {
if ad == nil {
return errors.New("Cannot provide a nil ad to UpdateLatLong")
}
hostname := strings.Split(ad.URL.Host, ":")[0]
ip, err := net.LookupIP(hostname)
if err != nil {
return err
}
if len(ip) == 0 {
return fmt.Errorf("Unable to find an IP address for hostname %s", hostname)
}
addr, ok := netip.AddrFromSlice(ip[0])
if !ok {
return errors.New("Failed to create address object from IP")
}
lat, long, err := GetLatLong(addr)
if err != nil {
return err
}
ad.Latitude = lat
ad.Longitude = long
return nil
}

func matchesPrefix(reqPath string, namespaceAds []NamespaceAd) *NamespaceAd {
for _, namespace := range namespaceAds {
serverPath := namespace.Path
if serverPath == reqPath {
return &namespace
}
serverPath += "/"
if strings.HasPrefix(reqPath, serverPath) {
return &namespace
}
}
return nil
}

func GetCacheAdsForPath(reqPath string) (originNamespace NamespaceAd, ads []ServerAd) {
serverAdMutex.RLock()
defer serverAdMutex.Unlock()
reqPath = path.Clean(reqPath)
for _, item := range serverAds.Items() {
if item == nil {
continue
}
serverAd := item.Key()
if serverAd.Type == OriginType {
ns := matchesPrefix(reqPath, item.Value())
if ns != nil {
originNamespace = *ns
}
continue
} else if serverAd.Type == CacheType && matchesPrefix(reqPath, item.Value()) != nil{
ads = append(ads, serverAd)
}
}
return
}
40 changes: 40 additions & 0 deletions director/geosort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package director

import (
"math"
)

// Mathematical function, not implementation, came from
// http://www.johndcook.com/python_longitude_latitude.html
func distanceOnSphere(lat1 float64, long1 float64, lat2 float64, long2 float64) float64 {

if (lat1 == lat2) && (long1 == long2) {
return 0.0
}

// Convert latitude and longitude to
// spherical coordinates in radians.
degrees_to_radians := math.Pi/180.0

// phi = 90 - latitude
phi1 := (90.0 - lat1)*degrees_to_radians
phi2 := (90.0 - lat2)*degrees_to_radians

// theta = longitude
theta1 := long1*degrees_to_radians
theta2 := long2*degrees_to_radians

// Compute spherical distance from spherical coordinates.

// For two locations in spherical coordinates
// (1, theta, phi) and (1, theta, phi)
// cosine( arc length ) =
// sin phi sin phi' cos(theta-theta') + cos phi cos phi'
// distance = rho * arc length

cos := (math.Sin(phi1) * math.Sin(phi2) * math.Cos(theta1 - theta2) +
math.Cos(phi1) * math.Cos(phi2))
arc := math.Acos( cos )

return arc
}
Loading