Skip to content

Commit

Permalink
Remove use of global fed metadata within client
Browse files Browse the repository at this point in the history
This is v2 of PR PelicanPlatform#798 since the merge conflicts were becoming very
tedious. Therefore, I took the changes from that PR and converted it
over to a new branch. You can get the changes split up a little better
from that PR as well but I will do my best to list them here:

Changes include:
- Added a ttl cache for storing federation url metadata so we do not
  have to look up each time
- Improved unit tests for client commands, Note: for pelican prefix and
  osdf:// url scheme, it's hard to test since we use osg-htc.org for
  metadata lookup. Therefore it is hard to work with fed in a box so I
  just check for the proper metadata lookup
- Added function called discoverUrlFederation which discovers a
  federation from a url and does not populate global metadata config
  values
- Added a function called schemeUnderstood to take some repeated code
  within client into one function to check for understood url schemes
- Added unit tests for discoverUrlFederation and schemeUnderstood
  • Loading branch information
joereuss12 committed Mar 1, 2024
1 parent 9e7c0b7 commit 1ab977d
Show file tree
Hide file tree
Showing 10 changed files with 683 additions and 475 deletions.
162 changes: 126 additions & 36 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ import (

"github.com/VividCortex/ewma"
"github.com/google/uuid"
"github.com/jellydator/ttlcache/v3"
"github.com/lestrrat-go/option"
"github.com/opensaucerer/grab/v3"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/studio-b12/gowebdav"
"github.com/vbauerster/mpb/v8"
"golang.org/x/sync/errgroup"
Expand All @@ -58,6 +58,27 @@ import (
var (
progressCtrOnce sync.Once
progressCtr *mpb.Progress

PelicanURLCache = ttlcache.New[string, PelicanURL](
ttlcache.WithTTL[string, PelicanURL](30*time.Minute),
ttlcache.WithLoader[string, PelicanURL](loader),
)

loader = ttlcache.LoaderFunc[string, PelicanURL](
func(c *ttlcache.Cache[string, PelicanURL], key string) *ttlcache.Item[string, PelicanURL] {
urlFederation, err := config.DiscoverUrlFederation(key)
if err != nil {
return nil
}
// Set our local url metadata
item := c.Set(key, PelicanURL{
directorUrl: urlFederation.DirectorEndpoint,
discoveryUrl: key,
registryUrl: urlFederation.NamespaceRegistrationEndpoint,
}, ttlcache.DefaultTTL)
return item
},
)
)

type (
Expand Down Expand Up @@ -225,6 +246,13 @@ type (
NeedsToken bool
PackOption string
}

PelicanURL struct {
objectUrl *url.URL
discoveryUrl string
directorUrl string
registryUrl string
}
)

const (
Expand Down Expand Up @@ -322,6 +350,86 @@ func newTransferResults(job *TransferJob) TransferResults {
}
}

func newPelicanURL(remoteUrl *url.URL, scheme string) (pelicanURL PelicanURL, err error) {
if remoteUrl.Host != "" {
if scheme == "osdf" || scheme == "stash" {
// in the osdf/stash case, fix url's that have a hostname
remoteUrl.Path, err = url.JoinPath(remoteUrl.Host, remoteUrl.Path)
if err != nil {
log.Errorln("Failed to join remote destination url path:", err)
return PelicanURL{}, err
}
} else if scheme == "pelican" {
// If we have a host and url is pelican, we need to extract federation data from the host
log.Debugln("Detected pelican:// url, getting federation metadata from specified host")
federationUrl := &url.URL{}
// federationUrl, _ := url.Parse(remoteUrl.String())
federationUrl.Scheme = "https"
federationUrl.Path = ""
federationUrl.Host = remoteUrl.Host

// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := PelicanURLCache.Get(federationUrl.String())
if pelicanUrlItem != nil {
pelicanURL = pelicanUrlItem.Value()
} else {
return PelicanURL{}, fmt.Errorf("Issue getting metadata information from cache. Provided url: %s", federationUrl.String())
}
}
}

// With an osdf:// url scheme, we assume the user will be using the OSDF so load in our osdf metadata for our url
if scheme == "osdf" {
// If we are using an osdf/stash binary, we discovered the federation already --> load into local url metadata
if config.GetPreferredPrefix() == "OSDF" {
log.Debugln("Detected an osdf binary with an osdf:// url, populating metadata with osdf defaults")
if param.Federation_DirectorUrl.GetString() == "" || param.Federation_DiscoveryUrl.GetString() == "" || param.Federation_RegistryUrl.GetString() == "" {
return PelicanURL{}, fmt.Errorf("osdf default metadata is not populated in config")
} else {
pelicanURL.directorUrl = param.Federation_DirectorUrl.GetString()
pelicanURL.discoveryUrl = param.Federation_DiscoveryUrl.GetString()
pelicanURL.registryUrl = param.Federation_RegistryUrl.GetString()
}
} else if config.GetPreferredPrefix() == "PELICAN" {
// We hit this case when we are using a pelican binary but an osdf:// url, therefore we need to disover the osdf federation
log.Debugln("Detected an pelican binary with an osdf:// url, populating metadata with osdf defaults")
// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := PelicanURLCache.Get("osg-htc.org")
if pelicanUrlItem != nil {
pelicanURL = pelicanUrlItem.Value()
} else {
return PelicanURL{}, fmt.Errorf("Issue getting metadata information from cache")
}
}
} else if scheme == "pelican" && remoteUrl.Host == "" {
// We hit this case when we do not have a hostname with a pelican:// url
if param.Federation_DiscoveryUrl.GetString() == "" {
return PelicanURL{}, fmt.Errorf("Pelican url scheme without discovery-url detected, please provide a federation discovery-url within the hostname or with the -f flag")
} else {
// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := PelicanURLCache.Get(param.Federation_DiscoveryUrl.GetString())
if pelicanUrlItem != nil {
pelicanURL = pelicanUrlItem.Value()
} else {
return PelicanURL{}, fmt.Errorf("Issue getting metadata information from cache")
}
}
} else if scheme == "" {
// If we don't have a url scheme, then our metadata information should be in the config
log.Debugln("No url scheme detected, getting metadata information from configuration")
pelicanURL.directorUrl = param.Federation_DirectorUrl.GetString()
pelicanURL.discoveryUrl = param.Federation_DiscoveryUrl.GetString()
pelicanURL.registryUrl = param.Federation_RegistryUrl.GetString()

// If the values do not exist, exit with failure
if pelicanURL.directorUrl == "" || pelicanURL.discoveryUrl == "" || pelicanURL.registryUrl == "" {
return PelicanURL{}, fmt.Errorf("Missing metadata information in config, ensure Federation DirectorUrl, RegistryUrl, and DiscoverUrl are all set")
}
}
pelicanURL.objectUrl = remoteUrl
return pelicanURL, nil
}

// Returns a new transfer engine object whose lifetime is tied
// to the provided context. Will launcher worker goroutines to
// handle the underlying transfers
Expand Down Expand Up @@ -707,14 +815,25 @@ func (te *TransferEngine) runJobHandler() error {
//
// The returned object can be further customized as desired.
// This function does not "submit" the job for execution.
func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, upload bool, recursive bool, options ...TransferOption) (tj *TransferJob, err error) {
func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, scheme string, upload bool, recursive bool, options ...TransferOption) (tj *TransferJob, err error) {

id, err := uuid.NewV7()
if err != nil {
return
}

copyUrl := *remoteUrl // Make a copy of the input URL to avoid concurrent issues.
pelicanURL, err := newPelicanURL(remoteUrl, scheme)
if err != nil {
err = errors.Wrap(err, "error generating metadata for specified url")
return
}

if pelicanURL.objectUrl == nil {
err = errors.Wrap(err, "No object url found")
return
}

copyUrl := *pelicanURL.objectUrl // Make a copy of the input URL to avoid concurrent issues.
tj = &TransferJob{
caches: tc.caches,
recursive: recursive,
Expand All @@ -741,45 +860,16 @@ func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, u
}
}

if remoteUrl.Scheme == "pelican" && remoteUrl.Host != "" {
fd := config.GetFederation()
defer config.SetFederation(fd)
config.SetFederation(config.FederationDiscovery{})
fedUrlCopy := *remoteUrl
fedUrlCopy.Scheme = "https"
fedUrlCopy.Path = ""
fedUrlCopy.RawFragment = ""
fedUrlCopy.RawQuery = ""
viper.Set("Federation.DiscoveryUrl", fedUrlCopy.String())
if err = config.DiscoverFederation(); err != nil {
return
}
} else if remoteUrl.Scheme == "osdf" {
if remoteUrl.Host != "" {
remoteUrl.Path = path.Clean(path.Join("/", remoteUrl.Host, remoteUrl.Path))
}
fd := config.GetFederation()
defer config.SetFederation(fd)
config.SetFederation(config.FederationDiscovery{})
fedUrl := &url.URL{}
fedUrl.Scheme = "https"
fedUrl.Host = "osg-htc.org"
viper.Set("Federation.DiscoveryUrl", fedUrl.String())
if err = config.DiscoverFederation(); err != nil {
return
}
}

tj.useDirector = param.Federation_DirectorUrl.GetString() != ""
ns, err := getNamespaceInfo(remoteUrl.Path, param.Federation_DirectorUrl.GetString(), upload)
tj.useDirector = pelicanURL.directorUrl != ""
ns, err := getNamespaceInfo(pelicanURL.objectUrl.Path, pelicanURL.directorUrl, upload)
if err != nil {
log.Errorln(err)
err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl)
err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", pelicanURL.objectUrl)
}
tj.namespace = ns

if upload || ns.UseTokenOnRead {
tj.token, err = getToken(remoteUrl, ns, true, "", tc.tokenLocation, !tj.skipAcquire)
tj.token, err = getToken(pelicanURL.objectUrl, ns, true, "", tc.tokenLocation, !tj.skipAcquire)
if err != nil {
return nil, fmt.Errorf("failed to get token for transfer: %v", err)
}
Expand Down
Loading

0 comments on commit 1ab977d

Please sign in to comment.