Skip to content

Commit

Permalink
Remove use of global fed metadata within client
Browse files Browse the repository at this point in the history
This is v2 of PR PelicanPlatform#798 since the merge conflicts were becoming very
tedious. Therefore, I took the changes from that PR and converted it
over to a new branch. You can get the changes split up a little better
from that PR as well but I will do my best to list them here:

Changes include:
- Added a ttl cache for storing federation url metadata so we do not
  have to look up each time
- Improved unit tests for client commands, Note: for pelican prefix and
  osdf:// url scheme, it's hard to test since we use osg-htc.org for
  metadata lookup. Therefore it is hard to work with fed in a box so I
  just check for the proper metadata lookup
- Added function called discoverUrlFederation which discovers a
  federation from a url and does not populate global metadata config
  values
- Added a function called schemeUnderstood to take some repeated code
  within client into one function to check for understood url schemes
- Added unit tests for discoverUrlFederation and schemeUnderstood
  • Loading branch information
joereuss12 committed Mar 13, 2024
1 parent cdff9a6 commit 36f7f82
Show file tree
Hide file tree
Showing 13 changed files with 659 additions and 497 deletions.
489 changes: 204 additions & 285 deletions client/fed_test.go

Large diffs are not rendered by default.

160 changes: 125 additions & 35 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ import (

"github.com/VividCortex/ewma"
"github.com/google/uuid"
"github.com/jellydator/ttlcache/v3"
"github.com/lestrrat-go/option"
"github.com/opensaucerer/grab/v3"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/studio-b12/gowebdav"
"github.com/vbauerster/mpb/v8"
"golang.org/x/sync/errgroup"
Expand All @@ -59,6 +59,27 @@ import (
var (
progressCtrOnce sync.Once
progressCtr *mpb.Progress

PelicanURLCache = ttlcache.New[string, PelicanURL](
ttlcache.WithTTL[string, PelicanURL](30*time.Minute),
ttlcache.WithLoader[string, PelicanURL](loader),
)

loader = ttlcache.LoaderFunc[string, PelicanURL](
func(c *ttlcache.Cache[string, PelicanURL], key string) *ttlcache.Item[string, PelicanURL] {
urlFederation, err := config.DiscoverUrlFederation(key)
if err != nil {
return nil
}
// Set our local url metadata
item := c.Set(key, PelicanURL{
DirectorUrl: urlFederation.DirectorEndpoint,
DiscoveryUrl: key,
RegistryUrl: urlFederation.NamespaceRegistrationEndpoint,
}, ttlcache.DefaultTTL)
return item
},
)
)

type (
Expand Down Expand Up @@ -233,6 +254,13 @@ type (
NeedsToken bool
PackOption string
}

PelicanURL struct {
ObjectUrl *url.URL
DiscoveryUrl string
DirectorUrl string
RegistryUrl string
}
)

const (
Expand Down Expand Up @@ -334,6 +362,86 @@ func (tr TransferResults) ID() string {
return tr.jobId.String()
}

func NewPelicanURL(remoteUrl *url.URL, scheme string) (pelicanURL PelicanURL, err error) {
if remoteUrl.Host != "" {
if scheme == "osdf" || scheme == "stash" {
// in the osdf/stash case, fix url's that have a hostname
remoteUrl.Path, err = url.JoinPath(remoteUrl.Host, remoteUrl.Path)
if err != nil {
log.Errorln("Failed to join remote destination url path:", err)
return PelicanURL{}, err
}
} else if scheme == "pelican" {
// If we have a host and url is pelican, we need to extract federation data from the host
log.Debugln("Detected pelican:// url, getting federation metadata from specified host")
federationUrl := &url.URL{}
// federationUrl, _ := url.Parse(remoteUrl.String())
federationUrl.Scheme = "https"
federationUrl.Path = ""
federationUrl.Host = remoteUrl.Host

// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := PelicanURLCache.Get(federationUrl.String())
if pelicanUrlItem != nil {
pelicanURL = pelicanUrlItem.Value()
} else {
return PelicanURL{}, fmt.Errorf("Issue getting metadata information from cache. Provided url: %s", federationUrl.String())
}
}
}

// With an osdf:// url scheme, we assume the user will be using the OSDF so load in our osdf metadata for our url
if scheme == "osdf" {
// If we are using an osdf/stash binary, we discovered the federation already --> load into local url metadata
if config.GetPreferredPrefix() == "OSDF" {
log.Debugln("Detected an osdf binary with an osdf:// url, populating metadata with osdf defaults")
if param.Federation_DirectorUrl.GetString() == "" || param.Federation_DiscoveryUrl.GetString() == "" || param.Federation_RegistryUrl.GetString() == "" {
return PelicanURL{}, fmt.Errorf("osdf default metadata is not populated in config")
} else {
pelicanURL.DirectorUrl = param.Federation_DirectorUrl.GetString()
pelicanURL.DiscoveryUrl = param.Federation_DiscoveryUrl.GetString()
pelicanURL.RegistryUrl = param.Federation_RegistryUrl.GetString()
}
} else if config.GetPreferredPrefix() == "PELICAN" {
// We hit this case when we are using a pelican binary but an osdf:// url, therefore we need to disover the osdf federation
log.Debugln("Detected an pelican binary with an osdf:// url, populating metadata with osdf defaults")
// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := PelicanURLCache.Get("osg-htc.org")
if pelicanUrlItem != nil {
pelicanURL = pelicanUrlItem.Value()
} else {
return PelicanURL{}, fmt.Errorf("Issue getting metadata information from cache")
}
}
} else if scheme == "pelican" && remoteUrl.Host == "" {
// We hit this case when we do not have a hostname with a pelican:// url
if param.Federation_DiscoveryUrl.GetString() == "" {
return PelicanURL{}, fmt.Errorf("Pelican url scheme without discovery-url detected, please provide a federation discovery-url within the hostname or with the -f flag")
} else {
// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := PelicanURLCache.Get(param.Federation_DiscoveryUrl.GetString())
if pelicanUrlItem != nil {
pelicanURL = pelicanUrlItem.Value()
} else {
return PelicanURL{}, fmt.Errorf("Issue getting metadata information from cache")
}
}
} else if scheme == "" {
// If we don't have a url scheme, then our metadata information should be in the config
log.Debugln("No url scheme detected, getting metadata information from configuration")
pelicanURL.DirectorUrl = param.Federation_DirectorUrl.GetString()
pelicanURL.DiscoveryUrl = param.Federation_DiscoveryUrl.GetString()
pelicanURL.RegistryUrl = param.Federation_RegistryUrl.GetString()

// If the values do not exist, exit with failure
if pelicanURL.DirectorUrl == "" || pelicanURL.DiscoveryUrl == "" || pelicanURL.RegistryUrl == "" {
return PelicanURL{}, fmt.Errorf("Missing metadata information in config, ensure Federation DirectorUrl, RegistryUrl, and DiscoverUrl are all set")
}
}
pelicanURL.ObjectUrl = remoteUrl
return pelicanURL, nil
}

// Returns a new transfer engine object whose lifetime is tied
// to the provided context. Will launcher worker goroutines to
// handle the underlying transfers
Expand Down Expand Up @@ -736,14 +844,25 @@ func (te *TransferEngine) runJobHandler() error {
//
// The returned object can be further customized as desired.
// This function does not "submit" the job for execution.
func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, upload bool, recursive bool, options ...TransferOption) (tj *TransferJob, err error) {
func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, scheme string, upload bool, recursive bool, options ...TransferOption) (tj *TransferJob, err error) {

id, err := uuid.NewV7()
if err != nil {
return
}

copyUrl := *remoteUrl // Make a copy of the input URL to avoid concurrent issues.
pelicanURL, err := NewPelicanURL(remoteUrl, scheme)
if err != nil {
err = errors.Wrap(err, "error generating metadata for specified url")
return
}

if pelicanURL.ObjectUrl == nil {
err = errors.Wrap(err, "No object url found")
return
}

copyUrl := *pelicanURL.ObjectUrl // Make a copy of the input URL to avoid concurrent issues.
tj = &TransferJob{
caches: tc.caches,
recursive: recursive,
Expand Down Expand Up @@ -773,40 +892,11 @@ func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, u
}
}

if remoteUrl.Scheme == "pelican" && remoteUrl.Host != "" {
fd := config.GetFederation()
defer config.SetFederation(fd)
config.SetFederation(config.FederationDiscovery{})
fedUrlCopy := *remoteUrl
fedUrlCopy.Scheme = "https"
fedUrlCopy.Path = ""
fedUrlCopy.RawFragment = ""
fedUrlCopy.RawQuery = ""
viper.Set("Federation.DiscoveryUrl", fedUrlCopy.String())
if err = config.DiscoverFederation(); err != nil {
return
}
} else if remoteUrl.Scheme == "osdf" {
if remoteUrl.Host != "" {
remoteUrl.Path = path.Clean(path.Join("/", remoteUrl.Host, remoteUrl.Path))
}
fd := config.GetFederation()
defer config.SetFederation(fd)
config.SetFederation(config.FederationDiscovery{})
fedUrl := &url.URL{}
fedUrl.Scheme = "https"
fedUrl.Host = "osg-htc.org"
viper.Set("Federation.DiscoveryUrl", fedUrl.String())
if err = config.DiscoverFederation(); err != nil {
return
}
}

tj.useDirector = param.Federation_DirectorUrl.GetString() != ""
ns, err := getNamespaceInfo(remoteUrl.Path, param.Federation_DirectorUrl.GetString(), upload)
tj.useDirector = pelicanURL.DirectorUrl != ""
ns, err := getNamespaceInfo(pelicanURL.ObjectUrl.Path, pelicanURL.DirectorUrl, upload)
if err != nil {
log.Errorln(err)
err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl)
err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", pelicanURL.ObjectUrl)
}
tj.namespace = ns

Expand Down
121 changes: 121 additions & 0 deletions client/handle_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package client
import (
"bytes"
"context"
"encoding/json"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -450,3 +451,123 @@ func TestSortAttempts(t *testing.T) {
assert.Equal(t, svr2.URL, results[0].Url.String())
assert.Equal(t, svr3.URL, results[1].Url.String())
}

func TestNewPelicanURL(t *testing.T) {
t.Run("TestOsdfOrStashSchemeWithOSDFPrefixNoError", func(t *testing.T) {
viper.Reset()
config.SetPreferredPrefix("OSDF")
remoteObject := "osdf:///something/somewhere/thatdoesnotexist.txt"
remoteObjectURL, err := url.Parse(remoteObject)
assert.NoError(t, err)

// Instead of relying on osdf, let's just set our global metadata (osdf prefix does this for us)
viper.Set("Federation.DirectorUrl", "someDirectorUrl")
viper.Set("Federation.DiscoveryUrl", "someDiscoveryUrl")
viper.Set("Federation.RegistryUrl", "someRegistryUrl")

pelicanURL, err := NewPelicanURL(remoteObjectURL, "osdf")
assert.NoError(t, err)

// Check pelicanURL properly filled out
assert.Equal(t, "someDirectorUrl", pelicanURL.DirectorUrl)
assert.Equal(t, "someDiscoveryUrl", pelicanURL.DiscoveryUrl)
assert.Equal(t, "someRegistryUrl", pelicanURL.RegistryUrl)
assert.Equal(t, remoteObjectURL, pelicanURL.ObjectUrl)
viper.Reset()
})

t.Run("TestOsdfOrStashSchemeWithOSDFPrefixWithError", func(t *testing.T) {
viper.Reset()
config.SetPreferredPrefix("OSDF")
remoteObject := "osdf:///something/somewhere/thatdoesnotexist.txt"
remoteObjectURL, err := url.Parse(remoteObject)
assert.NoError(t, err)

// Instead of relying on osdf, let's just set our global metadata but don't set one piece
viper.Set("Federation.DirectorUrl", "someDirectorUrl")
viper.Set("Federation.DiscoveryUrl", "someDiscoveryUrl")

_, err = NewPelicanURL(remoteObjectURL, "osdf")
// Make sure we get an error
assert.Error(t, err)
viper.Reset()
})

t.Run("TestOsdfOrStashSchemeWithPelicanPrefixNoError", func(t *testing.T) {
viper.Reset()
config.SetPreferredPrefix("PELICAN")
remoteObject := "osdf:///something/somewhere/thatdoesnotexist.txt"
remoteObjectURL, err := url.Parse(remoteObject)
assert.NoError(t, err)

pelicanURL, err := NewPelicanURL(remoteObjectURL, "osdf")
assert.NoError(t, err)

// Check pelicanURL properly filled out
assert.Equal(t, "https://osdf-director.osg-htc.org", pelicanURL.DirectorUrl)
assert.Equal(t, "osg-htc.org", pelicanURL.DiscoveryUrl)
assert.Equal(t, "https://osdf-registry.osg-htc.org", pelicanURL.RegistryUrl)
assert.Equal(t, remoteObjectURL, pelicanURL.ObjectUrl)
viper.Reset()
// Note: can't really test this for an error since that would require osg-htc.org to be down
})

t.Run("TestPelicanSchemeNoError", func(t *testing.T) {
viper.Reset()
viper.Set("TLSSkipVerify", true)
err := config.InitClient()
assert.NoError(t, err)
// Create a server that gives us a mock response
server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// make our response:
response := config.FederationDiscovery{
DirectorEndpoint: "director",
NamespaceRegistrationEndpoint: "registry",
JwksUri: "jwks",
BrokerEndpoint: "broker",
}

responseJSON, err := json.Marshal(response)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
_, err = w.Write(responseJSON)
assert.NoError(t, err)
}))
defer server.Close()

serverURL, err := url.Parse(server.URL)
assert.NoError(t, err)

remoteObject := "pelican://" + serverURL.Host + "/something/somewhere/thatdoesnotexist.txt"
remoteObjectURL, err := url.Parse(remoteObject)
assert.NoError(t, err)

pelicanURL, err := NewPelicanURL(remoteObjectURL, "pelican")
assert.NoError(t, err)

// Check pelicanURL properly filled out
assert.Equal(t, "director", pelicanURL.DirectorUrl)
assert.Equal(t, server.URL, pelicanURL.DiscoveryUrl)
assert.Equal(t, "registry", pelicanURL.RegistryUrl)
assert.Equal(t, remoteObjectURL, pelicanURL.ObjectUrl)
// Check to make sure it was populated in our cache
assert.True(t, PelicanURLCache.Has(pelicanURL.DiscoveryUrl))
viper.Reset()
})

t.Run("TestPelicanSchemeWithError", func(t *testing.T) {
viper.Reset()

remoteObject := "pelican://some-host/something/somewhere/thatdoesnotexist.txt"
remoteObjectURL, err := url.Parse(remoteObject)
assert.NoError(t, err)

_, err = NewPelicanURL(remoteObjectURL, "pelican")
assert.Error(t, err)
viper.Reset()
})
}
Loading

0 comments on commit 36f7f82

Please sign in to comment.