From b37a07af93b3b3ab2bc2bd65898ebc7acd6c34fc Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Tue, 9 Jan 2024 16:09:41 -0600 Subject: [PATCH] Fixes to origin/director advertise to client These fixes are for PR #622 which helps reinstate backwards compatibility. Changes include: - Returning `uploadFile` to its original state - Running a PUT on the director to get PUT endpoint instead of a GET - Moved this logic to `client/main.go` when we gather ns info - Moved the gathering of ns info to a seperate function --- client/handle_http.go | 39 ++--------- client/handle_http_test.go | 2 +- client/main.go | 139 ++++++++++++++++++++++++------------- cmd/origin.go | 2 +- director/advertise.go | 2 +- director/cache_ads.go | 2 +- director/origin_api.go | 2 +- director/redirect.go | 14 ++-- docs/parameters.yaml | 4 +- origin_ui/advertise.go | 4 +- param/parameters.go | 2 +- param/parameters_struct.go | 4 +- 12 files changed, 113 insertions(+), 103 deletions(-) diff --git a/client/handle_http.go b/client/handle_http.go index a53640d81..df9f58a92 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -19,7 +19,6 @@ package client import ( - "bytes" "context" "fmt" "io" @@ -849,40 +848,16 @@ func UploadFile(src string, origDest *url.URL, token string, namespace namespace nonZeroSize = fileInfo.Size() > 0 } - // call a GET on the director, director will respond with our endpoint - directorUrlStr := param.Federation_DirectorUrl.GetString() - directorUrl, err := url.Parse(directorUrlStr) - if err != nil { - return 0, errors.Wrap(err, "failed to parse director url") - } - directorUrl.Path, err = url.JoinPath("/api/v1.0/director/origin", origDest.Path) - if err != nil { - return 0, errors.Wrap(err, "failed to parse director path for upload") - } - - payload := []byte("forPUT") - req, err := http.NewRequest("GET", directorUrl.String(), bytes.NewBuffer(payload)) + // Parse the writeback host as a URL + writebackhostUrl, err := url.Parse(namespace.WriteBackHost) if err != nil { - return 0, errors.Wrap(err, "failed to construct request for director-origin query") + return 0, err } - client := &http.Client{ - Transport: config.GetTransport(), - CheckRedirect: func(req *http.Request, via []*http.Request) error { - return http.ErrUseLastResponse - }, - } - resp, err := client.Do(req) - if err != nil { - return 0, errors.Wrap(err, "failed to send request to director to obtain upload endpoint") - } - if resp.StatusCode == 405 { - return 0, errors.New("Error 405: No writeable origins were found") - } - defer resp.Body.Close() - dest, err := url.Parse(resp.Header.Get("Location")) - if err != nil { - return 0, errors.Wrap(err, "failed to parse location header from director response") + dest := &url.URL{ + Host: writebackhostUrl.Host, + Scheme: "https", + Path: origDest.Path, } // Create the wrapped reader and send it to the request diff --git a/client/handle_http_test.go b/client/handle_http_test.go index ff5bc90ad..66e83933e 100644 --- a/client/handle_http_test.go +++ b/client/handle_http_test.go @@ -469,7 +469,7 @@ func TestFullUpload(t *testing.T) { viper.Set("Origin.EnableCmsd", false) viper.Set("Origin.EnableMacaroons", false) viper.Set("Origin.EnableVoms", false) - viper.Set("Origin.WriteEnabled", true) + viper.Set("Origin.EnableWrite", true) viper.Set("TLSSkipVerify", true) viper.Set("Server.EnableUI", false) viper.Set("Registry.DbLocation", filepath.Join(t.TempDir(), "ns-registry.sqlite")) diff --git a/client/main.go b/client/main.go index 89b87598e..778abc53e 100644 --- a/client/main.go +++ b/client/main.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "net" + "net/http" "net/url" "regexp" "runtime/debug" @@ -412,6 +413,85 @@ func discoverHTCondorToken(tokenName string) string { return tokenLocation } +// This function receives remote location and tries to get namespace information from it +func getNamespaceInfo(remoteLocation string, remoteUrl *url.URL, OSDFDirectorUrl string, isPut bool) (namespaces.Namespace, error) { + var ns namespaces.Namespace + // If we have a director set, go through that for namespace info, otherwise use topology + if OSDFDirectorUrl != "" { + directorOriginsUrl, err := url.Parse(OSDFDirectorUrl) + if err != nil { + return ns, err + } + directorOriginsUrl.Path, err = url.JoinPath("api", "v1.0", "director", "origin") + if err != nil { + return ns, err + } + dirResp, err := QueryDirector(remoteLocation, directorOriginsUrl.String()) + if err != nil { + log.Errorln("Error while querying the Director:", err) + AddError(err) + return ns, err + } + ns, err = CreateNsFromDirectorResp(dirResp) + if err != nil { + AddError(err) + return ns, err + } + + // if we are doing a PUT, we need to get our endpoint from the director + if isPut { + // call a PUT on the director, director will respond with our endpoint + directorUrlStr := param.Federation_DirectorUrl.GetString() + directorUrl, err := url.Parse(directorUrlStr) + if err != nil { + log.Errorln("failed to parse director url") + return ns, err + } + directorUrl.Path, err = url.JoinPath("/api/v1.0/director/origin", remoteUrl.Path) + if err != nil { + log.Errorln("failed to parse director path for upload") + return ns, err + } + + req, err := http.NewRequest("PUT", directorUrl.String(), nil) + if err != nil { + log.Errorln(err) + return ns, errors.New("failed to construct request for director-origin query") + } + + client := &http.Client{ + Transport: config.GetTransport(), + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + resp, err := client.Do(req) + if err != nil { + log.Errorln(err) + return ns, errors.New("failed to send request to director to obtain upload endpoint") + } + if resp.StatusCode == 405 { + return ns, errors.New("Error 405: No writeable origins were found") + } + defer resp.Body.Close() + writeBackHost := resp.Header.Get("Location") + ns.WriteBackHost = writeBackHost + if err != nil { + log.Errorln(err) + return ns, errors.New("failed to parse location header from director response") + } + } + return ns, err + } else { + ns, err := namespaces.MatchNamespace(remoteUrl.Path) + if err != nil { + AddError(err) + return ns, err + } + return ns, err + } +} + // Start the transfer, whether read or write back func DoStashCPSingle(sourceFile string, destination string, methods []string, recursive bool) (bytesTransferred int64, err error) { @@ -499,41 +579,18 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re // For read it will be the source. OSDFDirectorUrl := param.Federation_DirectorUrl.GetString() - useOSDFDirector := viper.IsSet("Federation.DirectorURL") + isPut := false if destScheme == "stash" || destScheme == "osdf" || destScheme == "pelican" { log.Debugln("Detected writeback") + isPut = true if !strings.HasPrefix(destination, "/") { destination = strings.TrimPrefix(destination, destScheme+"://") } - var ns namespaces.Namespace - // If we have a director set, go through that for namespace info, otherwise use topology - if useOSDFDirector { - directorOriginsUrl, err := url.Parse(OSDFDirectorUrl) - if err != nil { - return 0, err - } - directorOriginsUrl.Path, err = url.JoinPath("api", "v1.0", "director", "origin") - if err != nil { - return 0, err - } - dirResp, err := QueryDirector(destination, directorOriginsUrl.String()) - if err != nil { - log.Errorln("Error while querying the Director:", err) - AddError(err) - return 0, err - } - ns, err = CreateNsFromDirectorResp(dirResp) - if err != nil { - AddError(err) - return 0, err - } - } else { - ns, err = namespaces.MatchNamespace(dest_url.Path) - if err != nil { - AddError(err) - return 0, err - } + ns, err := getNamespaceInfo(destination, dest_url, OSDFDirectorUrl, isPut) + if err != nil { + log.Errorln(err) + return 0, errors.New("Failed to get namespace information from destination") } uploadedBytes, err := doWriteBack(source_url.Path, dest_url, ns, recursive) AddError(err) @@ -552,26 +609,10 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re sourceFile = "/" + sourceFile } - var ns namespaces.Namespace - // If we have a director set, go through that for namespace info, otherwise use topology - if useOSDFDirector { - dirResp, err := QueryDirector(sourceFile, OSDFDirectorUrl) - if err != nil { - log.Errorln("Error while querying the Director:", err) - AddError(err) - return 0, err - } - ns, err = CreateNsFromDirectorResp(dirResp) - if err != nil { - AddError(err) - return 0, err - } - } else { - ns, err = namespaces.MatchNamespace(source_url.Path) - if err != nil { - AddError(err) - return 0, err - } + ns, err := getNamespaceInfo(sourceFile, source_url, OSDFDirectorUrl, isPut) + if err != nil { + log.Errorln(err) + return 0, errors.New("Failed to get namespace information from source") } // get absolute path diff --git a/cmd/origin.go b/cmd/origin.go index 845a4c0b0..8405ede61 100644 --- a/cmd/origin.go +++ b/cmd/origin.go @@ -122,7 +122,7 @@ func init() { // The -w flag is used if we want the origin to be writeable. originServeCmd.Flags().BoolP("writeable", "", true, "Allow/disable writting to the origin") - if err := viper.BindPFlag("Origin.WriteEnabled", originServeCmd.Flags().Lookup("writeable")); err != nil { + if err := viper.BindPFlag("Origin.EnableWrite", originServeCmd.Flags().Lookup("writeable")); err != nil { panic(err) } diff --git a/director/advertise.go b/director/advertise.go index 5da7b163a..ba32f1cb3 100644 --- a/director/advertise.go +++ b/director/advertise.go @@ -36,7 +36,7 @@ func parseServerAd(server utils.Server, serverType ServerType) ServerAd { serverAd.Type = serverType serverAd.Name = server.Resource - serverAd.WriteEnabled = param.Origin_WriteEnabled.GetBool() + serverAd.EnableWrite = param.Origin_EnableWrite.GetBool() // url.Parse requires that the scheme be present before the hostname, // but endpoints do not have a scheme. As such, we need to add one for the. // correct parsing. Luckily, we don't use this anywhere else (it's just to diff --git a/director/cache_ads.go b/director/cache_ads.go index c71afc752..385894e5d 100644 --- a/director/cache_ads.go +++ b/director/cache_ads.go @@ -53,7 +53,7 @@ type ( Type ServerType Latitude float64 Longitude float64 - WriteEnabled bool + EnableWrite bool } ServerType string diff --git a/director/origin_api.go b/director/origin_api.go index 13aaa2790..570465ec9 100644 --- a/director/origin_api.go +++ b/director/origin_api.go @@ -45,7 +45,7 @@ type ( URL string `json:"url"` // This is the url for origin's XRootD service and file transfer WebURL string `json:"web_url,omitempty"` // This is the url for origin's web engine and APIs Namespaces []NamespaceAd `json:"namespaces"` - WriteEnabled bool `json:"writeenabled"` + EnableWrite bool `json:"enablewrite"` } ) diff --git a/director/redirect.go b/director/redirect.go index 6f960cf02..6282290f1 100644 --- a/director/redirect.go +++ b/director/redirect.go @@ -21,7 +21,6 @@ package director import ( "context" "fmt" - "io" "net/http" "net/netip" "net/url" @@ -287,16 +286,10 @@ func RedirectToOrigin(ginCtx *gin.Context) { namespaceAd.Path, namespaceAd.RequireToken, namespaceAd.DirlistHost)} var redirectURL url.URL - body, err := io.ReadAll(ginCtx.Request.Body) - if err != nil { - ginCtx.String(http.StatusInternalServerError, "Could not read body of request\n") - return - } - // If we are doing a PUT, check to see if any origins are writeable - if strings.Contains(string(body), "forPUT") { + if ginCtx.Request.Method == "PUT" { for idx, ad := range originAds { - if ad.WriteEnabled { + if ad.EnableWrite { redirectURL = getRedirectURL(reqPath, originAds[idx], namespaceAd.RequireToken) ginCtx.Redirect(http.StatusTemporaryRedirect, getFinalRedirectURL(redirectURL, authzBearerEscaped)) return @@ -467,7 +460,7 @@ func registerServeAd(engineCtx context.Context, ctx *gin.Context, sType ServerTy URL: *ad_url, WebURL: *adWebUrl, Type: sType, - WriteEnabled: ad.WriteEnabled, + EnableWrite: ad.EnableWrite, } hasOriginAdInCache := serverAds.Has(sAd) @@ -552,6 +545,7 @@ func RegisterDirector(ctx context.Context, router *gin.RouterGroup) { // Establish the routes used for cache/origin redirection router.GET("/api/v1.0/director/object/*any", RedirectToCache) router.GET("/api/v1.0/director/origin/*any", RedirectToOrigin) + router.PUT("/api/v1.0/director/origin/*any", RedirectToOrigin) router.POST("/api/v1.0/director/registerOrigin", func(gctx *gin.Context) { RegisterOrigin(ctx, gctx) }) // In the foreseeable feature, director will scrape all servers in Pelican ecosystem (including registry) // so that director can be our point of contact for collecting system-level metrics. diff --git a/docs/parameters.yaml b/docs/parameters.yaml index 95b1080b7..6e0215f28 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -302,9 +302,9 @@ type: string default: none components: ["origin"] --- -name: Origin.WriteEnabled +name: Origin.EnableWrite description: >- - A boolean indicating if an origin is writeable on startup + A boolean indicating if an origin allows write access type: bool default: true components: ["origin"] diff --git a/origin_ui/advertise.go b/origin_ui/advertise.go index f95774a8c..af08b9770 100644 --- a/origin_ui/advertise.go +++ b/origin_ui/advertise.go @@ -53,7 +53,7 @@ func (server *OriginServer) CreateAdvertisement(name string, originUrl string, o prefix := param.Origin_NamespacePrefix.GetString() - writeEnabled := param.Origin_WriteEnabled.GetBool() + enableWrite := param.Origin_EnableWrite.GetBool() // TODO: Need to figure out where to get some of these values // so that they aren't hardcoded... nsAd := director.NamespaceAd{ @@ -69,7 +69,7 @@ func (server *OriginServer) CreateAdvertisement(name string, originUrl string, o URL: originUrl, WebURL: originWebUrl, Namespaces: []director.NamespaceAd{nsAd}, - WriteEnabled: writeEnabled, + EnableWrite: enableWrite, } return ad, nil diff --git a/param/parameters.go b/param/parameters.go index a9edc2dec..bbe0d5ad1 100644 --- a/param/parameters.go +++ b/param/parameters.go @@ -173,10 +173,10 @@ var ( Origin_EnableIssuer = BoolParam{"Origin.EnableIssuer"} Origin_EnableUI = BoolParam{"Origin.EnableUI"} Origin_EnableVoms = BoolParam{"Origin.EnableVoms"} + Origin_EnableWrite = BoolParam{"Origin.EnableWrite"} Origin_Multiuser = BoolParam{"Origin.Multiuser"} Origin_ScitokensMapSubject = BoolParam{"Origin.ScitokensMapSubject"} Origin_SelfTest = BoolParam{"Origin.SelfTest"} - Origin_WriteEnabled = BoolParam{"Origin.WriteEnabled"} Registry_RequireKeyChaining = BoolParam{"Registry.RequireKeyChaining"} Server_EnableUI = BoolParam{"Server.EnableUI"} StagePlugin_Hook = BoolParam{"StagePlugin.Hook"} diff --git a/param/parameters_struct.go b/param/parameters_struct.go index a2a276c38..48f95c628 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -87,6 +87,7 @@ type config struct { EnableIssuer bool EnableUI bool EnableVoms bool + EnableWrite bool ExportVolume string Mode string Multiuser bool @@ -104,7 +105,6 @@ type config struct { ScitokensUsernameClaim string SelfTest bool Url string - WriteEnabled bool XRootDPrefix string } Plugin struct { @@ -250,6 +250,7 @@ type configWithType struct { EnableIssuer struct { Type string; Value bool } EnableUI struct { Type string; Value bool } EnableVoms struct { Type string; Value bool } + EnableWrite struct { Type string; Value bool } ExportVolume struct { Type string; Value string } Mode struct { Type string; Value string } Multiuser struct { Type string; Value bool } @@ -267,7 +268,6 @@ type configWithType struct { ScitokensUsernameClaim struct { Type string; Value string } SelfTest struct { Type string; Value bool } Url struct { Type string; Value string } - WriteEnabled struct { Type string; Value bool } XRootDPrefix struct { Type string; Value string } } Plugin struct {