Skip to content

Commit

Permalink
Fixes to origin/director advertise to client
Browse files Browse the repository at this point in the history
These fixes are for PR PelicanPlatform#622 which helps reinstate backwards
compatibility. Changes include:
- Returning `uploadFile` to its original state
- Running a PUT on the director to get PUT endpoint instead of a GET
   - Moved this logic to `client/main.go` when we gather ns info
- Moved the gathering of ns info to a seperate function
  • Loading branch information
joereuss12 committed Jan 10, 2024
1 parent a569faa commit b37a07a
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 103 deletions.
39 changes: 7 additions & 32 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package client

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -849,40 +848,16 @@ func UploadFile(src string, origDest *url.URL, token string, namespace namespace
nonZeroSize = fileInfo.Size() > 0
}

// call a GET on the director, director will respond with our endpoint
directorUrlStr := param.Federation_DirectorUrl.GetString()
directorUrl, err := url.Parse(directorUrlStr)
if err != nil {
return 0, errors.Wrap(err, "failed to parse director url")
}
directorUrl.Path, err = url.JoinPath("/api/v1.0/director/origin", origDest.Path)
if err != nil {
return 0, errors.Wrap(err, "failed to parse director path for upload")
}

payload := []byte("forPUT")
req, err := http.NewRequest("GET", directorUrl.String(), bytes.NewBuffer(payload))
// Parse the writeback host as a URL
writebackhostUrl, err := url.Parse(namespace.WriteBackHost)
if err != nil {
return 0, errors.Wrap(err, "failed to construct request for director-origin query")
return 0, err
}

client := &http.Client{
Transport: config.GetTransport(),
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Do(req)
if err != nil {
return 0, errors.Wrap(err, "failed to send request to director to obtain upload endpoint")
}
if resp.StatusCode == 405 {
return 0, errors.New("Error 405: No writeable origins were found")
}
defer resp.Body.Close()
dest, err := url.Parse(resp.Header.Get("Location"))
if err != nil {
return 0, errors.Wrap(err, "failed to parse location header from director response")
dest := &url.URL{
Host: writebackhostUrl.Host,
Scheme: "https",
Path: origDest.Path,
}

// Create the wrapped reader and send it to the request
Expand Down
2 changes: 1 addition & 1 deletion client/handle_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func TestFullUpload(t *testing.T) {
viper.Set("Origin.EnableCmsd", false)
viper.Set("Origin.EnableMacaroons", false)
viper.Set("Origin.EnableVoms", false)
viper.Set("Origin.WriteEnabled", true)
viper.Set("Origin.EnableWrite", true)
viper.Set("TLSSkipVerify", true)
viper.Set("Server.EnableUI", false)
viper.Set("Registry.DbLocation", filepath.Join(t.TempDir(), "ns-registry.sqlite"))
Expand Down
139 changes: 90 additions & 49 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"net"
"net/http"
"net/url"
"regexp"
"runtime/debug"
Expand Down Expand Up @@ -412,6 +413,85 @@ func discoverHTCondorToken(tokenName string) string {
return tokenLocation
}

// This function receives remote location and tries to get namespace information from it
func getNamespaceInfo(remoteLocation string, remoteUrl *url.URL, OSDFDirectorUrl string, isPut bool) (namespaces.Namespace, error) {
var ns namespaces.Namespace
// If we have a director set, go through that for namespace info, otherwise use topology
if OSDFDirectorUrl != "" {
directorOriginsUrl, err := url.Parse(OSDFDirectorUrl)
if err != nil {
return ns, err
}
directorOriginsUrl.Path, err = url.JoinPath("api", "v1.0", "director", "origin")
if err != nil {
return ns, err
}
dirResp, err := QueryDirector(remoteLocation, directorOriginsUrl.String())
if err != nil {
log.Errorln("Error while querying the Director:", err)
AddError(err)
return ns, err
}
ns, err = CreateNsFromDirectorResp(dirResp)
if err != nil {
AddError(err)
return ns, err
}

// if we are doing a PUT, we need to get our endpoint from the director
if isPut {
// call a PUT on the director, director will respond with our endpoint
directorUrlStr := param.Federation_DirectorUrl.GetString()
directorUrl, err := url.Parse(directorUrlStr)
if err != nil {
log.Errorln("failed to parse director url")
return ns, err
}
directorUrl.Path, err = url.JoinPath("/api/v1.0/director/origin", remoteUrl.Path)
if err != nil {
log.Errorln("failed to parse director path for upload")
return ns, err
}

req, err := http.NewRequest("PUT", directorUrl.String(), nil)
if err != nil {
log.Errorln(err)
return ns, errors.New("failed to construct request for director-origin query")
}

client := &http.Client{
Transport: config.GetTransport(),
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Do(req)
if err != nil {
log.Errorln(err)
return ns, errors.New("failed to send request to director to obtain upload endpoint")
}
if resp.StatusCode == 405 {
return ns, errors.New("Error 405: No writeable origins were found")
}
defer resp.Body.Close()
writeBackHost := resp.Header.Get("Location")
ns.WriteBackHost = writeBackHost
if err != nil {
log.Errorln(err)
return ns, errors.New("failed to parse location header from director response")
}
}
return ns, err
} else {
ns, err := namespaces.MatchNamespace(remoteUrl.Path)
if err != nil {
AddError(err)
return ns, err
}
return ns, err
}
}

// Start the transfer, whether read or write back
func DoStashCPSingle(sourceFile string, destination string, methods []string, recursive bool) (bytesTransferred int64, err error) {

Expand Down Expand Up @@ -499,41 +579,18 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
// For read it will be the source.

OSDFDirectorUrl := param.Federation_DirectorUrl.GetString()
useOSDFDirector := viper.IsSet("Federation.DirectorURL")
isPut := false

if destScheme == "stash" || destScheme == "osdf" || destScheme == "pelican" {
log.Debugln("Detected writeback")
isPut = true
if !strings.HasPrefix(destination, "/") {
destination = strings.TrimPrefix(destination, destScheme+"://")
}
var ns namespaces.Namespace
// If we have a director set, go through that for namespace info, otherwise use topology
if useOSDFDirector {
directorOriginsUrl, err := url.Parse(OSDFDirectorUrl)
if err != nil {
return 0, err
}
directorOriginsUrl.Path, err = url.JoinPath("api", "v1.0", "director", "origin")
if err != nil {
return 0, err
}
dirResp, err := QueryDirector(destination, directorOriginsUrl.String())
if err != nil {
log.Errorln("Error while querying the Director:", err)
AddError(err)
return 0, err
}
ns, err = CreateNsFromDirectorResp(dirResp)
if err != nil {
AddError(err)
return 0, err
}
} else {
ns, err = namespaces.MatchNamespace(dest_url.Path)
if err != nil {
AddError(err)
return 0, err
}
ns, err := getNamespaceInfo(destination, dest_url, OSDFDirectorUrl, isPut)
if err != nil {
log.Errorln(err)
return 0, errors.New("Failed to get namespace information from destination")
}
uploadedBytes, err := doWriteBack(source_url.Path, dest_url, ns, recursive)
AddError(err)
Expand All @@ -552,26 +609,10 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
sourceFile = "/" + sourceFile
}

var ns namespaces.Namespace
// If we have a director set, go through that for namespace info, otherwise use topology
if useOSDFDirector {
dirResp, err := QueryDirector(sourceFile, OSDFDirectorUrl)
if err != nil {
log.Errorln("Error while querying the Director:", err)
AddError(err)
return 0, err
}
ns, err = CreateNsFromDirectorResp(dirResp)
if err != nil {
AddError(err)
return 0, err
}
} else {
ns, err = namespaces.MatchNamespace(source_url.Path)
if err != nil {
AddError(err)
return 0, err
}
ns, err := getNamespaceInfo(sourceFile, source_url, OSDFDirectorUrl, isPut)
if err != nil {
log.Errorln(err)
return 0, errors.New("Failed to get namespace information from source")
}

// get absolute path
Expand Down
2 changes: 1 addition & 1 deletion cmd/origin.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func init() {

// The -w flag is used if we want the origin to be writeable.
originServeCmd.Flags().BoolP("writeable", "", true, "Allow/disable writting to the origin")
if err := viper.BindPFlag("Origin.WriteEnabled", originServeCmd.Flags().Lookup("writeable")); err != nil {
if err := viper.BindPFlag("Origin.EnableWrite", originServeCmd.Flags().Lookup("writeable")); err != nil {
panic(err)
}

Expand Down
2 changes: 1 addition & 1 deletion director/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func parseServerAd(server utils.Server, serverType ServerType) ServerAd {
serverAd.Type = serverType
serverAd.Name = server.Resource

serverAd.WriteEnabled = param.Origin_WriteEnabled.GetBool()
serverAd.EnableWrite = param.Origin_EnableWrite.GetBool()
// url.Parse requires that the scheme be present before the hostname,
// but endpoints do not have a scheme. As such, we need to add one for the.
// correct parsing. Luckily, we don't use this anywhere else (it's just to
Expand Down
2 changes: 1 addition & 1 deletion director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type (
Type ServerType
Latitude float64
Longitude float64
WriteEnabled bool
EnableWrite bool
}

ServerType string
Expand Down
2 changes: 1 addition & 1 deletion director/origin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type (
URL string `json:"url"` // This is the url for origin's XRootD service and file transfer
WebURL string `json:"web_url,omitempty"` // This is the url for origin's web engine and APIs
Namespaces []NamespaceAd `json:"namespaces"`
WriteEnabled bool `json:"writeenabled"`
EnableWrite bool `json:"enablewrite"`
}
)

Expand Down
14 changes: 4 additions & 10 deletions director/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package director
import (
"context"
"fmt"
"io"
"net/http"
"net/netip"
"net/url"
Expand Down Expand Up @@ -287,16 +286,10 @@ func RedirectToOrigin(ginCtx *gin.Context) {
namespaceAd.Path, namespaceAd.RequireToken, namespaceAd.DirlistHost)}

var redirectURL url.URL
body, err := io.ReadAll(ginCtx.Request.Body)
if err != nil {
ginCtx.String(http.StatusInternalServerError, "Could not read body of request\n")
return
}

// If we are doing a PUT, check to see if any origins are writeable
if strings.Contains(string(body), "forPUT") {
if ginCtx.Request.Method == "PUT" {
for idx, ad := range originAds {
if ad.WriteEnabled {
if ad.EnableWrite {
redirectURL = getRedirectURL(reqPath, originAds[idx], namespaceAd.RequireToken)
ginCtx.Redirect(http.StatusTemporaryRedirect, getFinalRedirectURL(redirectURL, authzBearerEscaped))
return
Expand Down Expand Up @@ -467,7 +460,7 @@ func registerServeAd(engineCtx context.Context, ctx *gin.Context, sType ServerTy
URL: *ad_url,
WebURL: *adWebUrl,
Type: sType,
WriteEnabled: ad.WriteEnabled,
EnableWrite: ad.EnableWrite,
}

hasOriginAdInCache := serverAds.Has(sAd)
Expand Down Expand Up @@ -552,6 +545,7 @@ func RegisterDirector(ctx context.Context, router *gin.RouterGroup) {
// Establish the routes used for cache/origin redirection
router.GET("/api/v1.0/director/object/*any", RedirectToCache)
router.GET("/api/v1.0/director/origin/*any", RedirectToOrigin)
router.PUT("/api/v1.0/director/origin/*any", RedirectToOrigin)
router.POST("/api/v1.0/director/registerOrigin", func(gctx *gin.Context) { RegisterOrigin(ctx, gctx) })
// In the foreseeable feature, director will scrape all servers in Pelican ecosystem (including registry)
// so that director can be our point of contact for collecting system-level metrics.
Expand Down
4 changes: 2 additions & 2 deletions docs/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,9 @@ type: string
default: none
components: ["origin"]
---
name: Origin.WriteEnabled
name: Origin.EnableWrite
description: >-
A boolean indicating if an origin is writeable on startup
A boolean indicating if an origin allows write access
type: bool
default: true
components: ["origin"]
Expand Down
4 changes: 2 additions & 2 deletions origin_ui/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (server *OriginServer) CreateAdvertisement(name string, originUrl string, o

prefix := param.Origin_NamespacePrefix.GetString()

writeEnabled := param.Origin_WriteEnabled.GetBool()
enableWrite := param.Origin_EnableWrite.GetBool()
// TODO: Need to figure out where to get some of these values
// so that they aren't hardcoded...
nsAd := director.NamespaceAd{
Expand All @@ -69,7 +69,7 @@ func (server *OriginServer) CreateAdvertisement(name string, originUrl string, o
URL: originUrl,
WebURL: originWebUrl,
Namespaces: []director.NamespaceAd{nsAd},
WriteEnabled: writeEnabled,
EnableWrite: enableWrite,
}

return ad, nil
Expand Down
2 changes: 1 addition & 1 deletion param/parameters.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions param/parameters_struct.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b37a07a

Please sign in to comment.