Skip to content

Commit

Permalink
Pull common struct into common pkg to eliminate cyclic import
Browse files Browse the repository at this point in the history
  • Loading branch information
haoming29 committed Jan 23, 2024
1 parent 1f719d9 commit 2d9bd8f
Show file tree
Hide file tree
Showing 23 changed files with 279 additions and 245 deletions.
6 changes: 3 additions & 3 deletions cache_ui/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package cache_ui

import (
"github.com/pelicanplatform/pelican/common"
"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/director"
"github.com/pelicanplatform/pelican/server_utils"
)

Expand All @@ -30,8 +30,8 @@ type (
}
)

func (server *CacheServer) CreateAdvertisement(name string, originUrl string, originWebUrl string) (director.OriginAdvertise, error) {
ad := director.OriginAdvertise{
func (server *CacheServer) CreateAdvertisement(name string, originUrl string, originWebUrl string) (common.OriginAdvertise, error) {
ad := common.OriginAdvertise{
Name: name,
URL: originUrl,
WebURL: originWebUrl,
Expand Down
6 changes: 3 additions & 3 deletions cmd/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import (
"time"

"github.com/pelicanplatform/pelican/cache_ui"
"github.com/pelicanplatform/pelican/common"
"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/daemon"
"github.com/pelicanplatform/pelican/director"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_ui"
"github.com/pelicanplatform/pelican/server_utils"
Expand All @@ -46,9 +46,9 @@ import (
"golang.org/x/sync/errgroup"
)

func getNSAdsFromDirector() ([]director.NamespaceAd, error) {
func getNSAdsFromDirector() ([]common.NamespaceAd, error) {
// Get the endpoint of the director
var respNS []director.NamespaceAd
var respNS []common.NamespaceAd
directorEndpoint, err := getDirectorEndpoint()
if err != nil {
return respNS, errors.Wrapf(err, "Failed to get DirectorURL from config: %v", err)
Expand Down
97 changes: 97 additions & 0 deletions common/director.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/***************************************************************
*
* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/

// Common pacakge contains shared structs and methods between different Pelican pacakges.
package common

import (
"encoding/json"
"net/url"
)

type (
NamespaceAd struct {
RequireToken bool `json:"requireToken"`
Path string `json:"path"`
Issuer url.URL `json:"url"`
MaxScopeDepth uint `json:"maxScopeDepth"`
Strategy StrategyType `json:"strategy"`
BasePath string `json:"basePath"`
VaultServer string `json:"vaultServer"`
DirlistHost string `json:"dirlisthost"`
}

ServerAd struct {
Name string `json:"name"`
AuthURL url.URL `json:"auth_url"`
URL url.URL `json:"url"` // This is server's XRootD URL for file transfer
WebURL url.URL `json:"web_url"` // This is server's Web interface and API
Type ServerType `json:"type"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
EnableWrite bool `json:"enable_write"`
EnableFallbackRead bool `json:"enable_fallback_read"` // True if reads from the origin are permitted when no cache is available
}

OriginAdvertise struct {
Name string `json:"name"`
URL string `json:"url"` // This is the url for origin's XRootD service and file transfer
WebURL string `json:"web_url,omitempty"` // This is the url for origin's web engine and APIs
Namespaces []NamespaceAd `json:"namespaces"`
EnableWrite bool `json:"enablewrite"`
EnableFallbackRead bool `json:"enable-fallback-read"` // True if the origin will allow direct client reads when no caches are available
}

ServerType string
StrategyType string
)

const (
CacheType ServerType = "Cache"
OriginType ServerType = "Origin"
)

const (
OAuthStrategy StrategyType = "OAuth2"
VaultStrategy StrategyType = "Vault"
)

func (ad ServerAd) MarshalJSON() ([]byte, error) {
baseAd := struct {
Name string `json:"name"`
AuthURL string `json:"auth_url"`
URL string `json:"url"`
WebURL string `json:"web_url"`
Type ServerType `json:"type"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
EnableWrite bool `json:"enable_write"`
EnableFallbackRead bool `json:"enable_fallback_read"`
}{
Name: ad.Name,
AuthURL: ad.AuthURL.String(),
URL: ad.URL.String(),
WebURL: ad.WebURL.String(),
Type: ad.Type,
Latitude: ad.Latitude,
Longitude: ad.Longitude,
EnableWrite: ad.EnableWrite,
EnableFallbackRead: ad.EnableFallbackRead,
}
return json.Marshal(baseAd)
}
17 changes: 9 additions & 8 deletions director/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/pelicanplatform/pelican/common"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/utils"
)

func parseServerAd(server utils.Server, serverType ServerType) ServerAd {
serverAd := ServerAd{}
func parseServerAd(server utils.Server, serverType common.ServerType) common.ServerAd {
serverAd := common.ServerAd{}
serverAd.Type = serverType
serverAd.Name = server.Resource

Expand Down Expand Up @@ -73,10 +74,10 @@ func AdvertiseOSDF() error {
return errors.Wrapf(err, "Failed to get topology JSON")
}

cacheAdMap := make(map[ServerAd][]NamespaceAd)
originAdMap := make(map[ServerAd][]NamespaceAd)
cacheAdMap := make(map[common.ServerAd][]common.NamespaceAd)
originAdMap := make(map[common.ServerAd][]common.NamespaceAd)
for _, ns := range namespaces.Namespaces {
nsAd := NamespaceAd{}
nsAd := common.NamespaceAd{}
nsAd.RequireToken = ns.UseTokenOnRead
nsAd.Path = ns.Path
nsAd.DirlistHost = ns.DirlistHost
Expand All @@ -87,7 +88,7 @@ func AdvertiseOSDF() error {
}
nsAd.Issuer = *issuerURL
nsAd.MaxScopeDepth = uint(ns.CredentialGeneration.MaxScopeDepth)
nsAd.Strategy = StrategyType(ns.CredentialGeneration.Strategy)
nsAd.Strategy = common.StrategyType(ns.CredentialGeneration.Strategy)
nsAd.BasePath = ns.CredentialGeneration.BasePath
nsAd.VaultServer = ns.CredentialGeneration.VaultServer

Expand All @@ -96,12 +97,12 @@ func AdvertiseOSDF() error {
// they're listed as inactive by topology). These namespaces will all be mapped to the
// same useless origin ad, resulting in a 404 for queries to those namespaces
for _, origin := range ns.Origins {
originAd := parseServerAd(origin, OriginType)
originAd := parseServerAd(origin, common.OriginType)
originAdMap[originAd] = append(originAdMap[originAd], nsAd)
}

for _, cache := range ns.Caches {
cacheAd := parseServerAd(cache, CacheType)
cacheAd := parseServerAd(cache, common.CacheType)
cacheAdMap[cacheAd] = append(cacheAdMap[cacheAd], nsAd)
}
}
Expand Down
9 changes: 5 additions & 4 deletions director/advertise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"

"github.com/pelicanplatform/pelican/common"
"github.com/pelicanplatform/pelican/utils"
)

Expand All @@ -21,16 +22,16 @@ func TestParseServerAd(t *testing.T) {

// Check that we populate all of the fields correctly -- note that lat/long don't get updated
// until right before the ad is recorded, so we don't check for that here.
ad := parseServerAd(server, OriginType)
ad := parseServerAd(server, common.OriginType)
assert.Equal(t, ad.AuthURL.String(), "https://my-auth-endpoint.com")
assert.Equal(t, ad.URL.String(), "http://my-endpoint.com")
assert.Equal(t, ad.WebURL.String(), "")
assert.Equal(t, ad.Name, "MY_SERVER")
assert.True(t, ad.Type == OriginType)
assert.True(t, ad.Type == common.OriginType)

// A quick check that type is set correctly
ad = parseServerAd(server, CacheType)
assert.True(t, ad.Type == CacheType)
ad = parseServerAd(server, common.CacheType)
assert.True(t, ad.Type == common.CacheType)
}

func JSONHandler(w http.ResponseWriter, r *http.Request) {
Expand Down
90 changes: 13 additions & 77 deletions director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,90 +19,26 @@
package director

import (
"encoding/json"
"errors"
"fmt"
"net"
"net/netip"
"net/url"
"path"
"strings"
"sync"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/pelicanplatform/pelican/common"
log "github.com/sirupsen/logrus"
)

type (
NamespaceAd struct {
RequireToken bool `json:"requireToken"`
Path string `json:"path"`
Issuer url.URL `json:"url"`
MaxScopeDepth uint `json:"maxScopeDepth"`
Strategy StrategyType `json:"strategy"`
BasePath string `json:"basePath"`
VaultServer string `json:"vaultServer"`
DirlistHost string `json:"dirlisthost"`
}

ServerAd struct {
Name string `json:"name"`
AuthURL url.URL `json:"auth_url"`
URL url.URL `json:"url"` // This is server's XRootD URL for file transfer
WebURL url.URL `json:"web_url"` // This is server's Web interface and API
Type ServerType `json:"type"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
EnableWrite bool `json:"enable_write"`
EnableFallbackRead bool `json:"enable_fallback_read"` // True if reads from the origin are permitted when no cache is available
}

ServerType string
StrategyType string
)

const (
CacheType ServerType = "Cache"
OriginType ServerType = "Origin"
)

const (
OAuthStrategy StrategyType = "OAuth2"
VaultStrategy StrategyType = "Vault"
)

var (
serverAds = ttlcache.New[ServerAd, []NamespaceAd](ttlcache.WithTTL[ServerAd, []NamespaceAd](15 * time.Minute))
serverAds = ttlcache.New[common.ServerAd, []common.NamespaceAd](ttlcache.WithTTL[common.ServerAd, []common.NamespaceAd](15 * time.Minute))
serverAdMutex = sync.RWMutex{}
)

func (ad ServerAd) MarshalJSON() ([]byte, error) {
baseAd := struct {
Name string `json:"name"`
AuthURL string `json:"auth_url"`
URL string `json:"url"`
WebURL string `json:"web_url"`
Type ServerType `json:"type"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
EnableWrite bool `json:"enable_write"`
EnableFallbackRead bool `json:"enable_fallback_read"`
}{
Name: ad.Name,
AuthURL: ad.AuthURL.String(),
URL: ad.URL.String(),
WebURL: ad.WebURL.String(),
Type: ad.Type,
Latitude: ad.Latitude,
Longitude: ad.Longitude,
EnableWrite: ad.EnableWrite,
EnableFallbackRead: ad.EnableFallbackRead,
}
return json.Marshal(baseAd)
}

func RecordAd(ad ServerAd, namespaceAds *[]NamespaceAd) {
func RecordAd(ad common.ServerAd, namespaceAds *[]common.NamespaceAd) {
if err := UpdateLatLong(&ad); err != nil {
log.Debugln("Failed to lookup GeoIP coordinates for host", ad.URL.Host)
}
Expand All @@ -111,7 +47,7 @@ func RecordAd(ad ServerAd, namespaceAds *[]NamespaceAd) {
serverAds.Set(ad, *namespaceAds, ttlcache.DefaultTTL)
}

func UpdateLatLong(ad *ServerAd) error {
func UpdateLatLong(ad *common.ServerAd) error {
if ad == nil {
return errors.New("Cannot provide a nil ad to UpdateLatLong")
}
Expand All @@ -136,8 +72,8 @@ func UpdateLatLong(ad *ServerAd) error {
return nil
}

func matchesPrefix(reqPath string, namespaceAds []NamespaceAd) *NamespaceAd {
var best *NamespaceAd
func matchesPrefix(reqPath string, namespaceAds []common.NamespaceAd) *common.NamespaceAd {
var best *common.NamespaceAd

for _, namespace := range namespaceAds {
serverPath := namespace.Path
Expand Down Expand Up @@ -166,15 +102,15 @@ func matchesPrefix(reqPath string, namespaceAds []NamespaceAd) *NamespaceAd {
// Make the len comparison with tmpBest, because serverPath is one char longer now
if strings.HasPrefix(reqPath, serverPath) && len(serverPath) > len(tmpBest) {
if best == nil {
best = new(NamespaceAd)
best = new(common.NamespaceAd)
}
*best = namespace
}
}
return best
}

func GetAdsForPath(reqPath string) (originNamespace NamespaceAd, originAds []ServerAd, cacheAds []ServerAd) {
func GetAdsForPath(reqPath string) (originNamespace common.NamespaceAd, originAds []common.ServerAd, cacheAds []common.ServerAd) {
serverAdMutex.RLock()
defer serverAdMutex.RUnlock()

Expand All @@ -186,32 +122,32 @@ func GetAdsForPath(reqPath string) (originNamespace NamespaceAd, originAds []Ser
// Iterate through all of the server ads. For each "item", the key
// is the server ad itself (either cache or origin), and the value
// is a slice of namespace prefixes are supported by that server
var best *NamespaceAd
var best *common.NamespaceAd
for _, item := range serverAds.Items() {
if item == nil {
continue
}
serverAd := item.Key()
if serverAd.Type == OriginType {
if serverAd.Type == common.OriginType {
if ns := matchesPrefix(reqPath, item.Value()); ns != nil {
if best == nil || len(ns.Path) > len(best.Path) {
best = ns
// If anything was previously set by a namespace that constituted a shorter
// prefix, we overwrite that here because we found a better ns. We also clear
// the other slice of server ads, because we know those aren't good anymore
originAds = append(originAds[:0], serverAd)
cacheAds = []ServerAd{}
cacheAds = []common.ServerAd{}
} else if ns.Path == best.Path {
originAds = append(originAds, serverAd)
}
}
continue
} else if serverAd.Type == CacheType {
} else if serverAd.Type == common.CacheType {
if ns := matchesPrefix(reqPath, item.Value()); ns != nil {
if best == nil || len(ns.Path) > len(best.Path) {
best = ns
cacheAds = append(cacheAds[:0], serverAd)
originAds = []ServerAd{}
originAds = []common.ServerAd{}
} else if ns.Path == best.Path {
cacheAds = append(cacheAds, serverAd)
}
Expand Down
Loading

0 comments on commit 2d9bd8f

Please sign in to comment.