Skip to content

Commit

Permalink
Merge branch 'main' of github.com:PelicanPlatform/pelican into issue-…
Browse files Browse the repository at this point in the history
…1461
  • Loading branch information
haoming29 committed Jul 12, 2024
2 parents 93c7586 + abae483 commit 0686eb9
Show file tree
Hide file tree
Showing 31 changed files with 759 additions and 766 deletions.
20 changes: 9 additions & 11 deletions director/director_ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package director
import (
"fmt"
"net/http"
"net/url"
"path"
"strings"

Expand All @@ -40,7 +39,10 @@ type (
}

listServerResponse struct {
Name string `json:"name"`
Name string `json:"name"`
// AuthURL is Deprecated, for Pelican severs, URL is used as the base URL for object access.
// This is to maintain compatibility with the topology servers, where it uses AuthURL for
// accessing protected objects and URL for public objects.
AuthURL string `json:"authUrl"`
BrokerURL string `json:"brokerUrl"`
URL string `json:"url"` // This is server's XRootD URL for file transfer
Expand Down Expand Up @@ -112,16 +114,12 @@ func listServers(ctx *gin.Context) {
log.Debugf("listServers: healthTestUtils not found for server at %s", server.URL.String())
}
filtered, ft := checkFilter(server.Name)
var auth_url string
if server.AuthURL == (url.URL{}) {
auth_url = server.URL.String()
} else {
auth_url = server.AuthURL.String()
}

res := listServerResponse{
Name: server.Name,
BrokerURL: server.BrokerURL.String(),
AuthURL: auth_url,
Name: server.Name,
BrokerURL: server.BrokerURL.String(),
// For web UI, if authURL is not set, we don't want to confuse user by copying server URL as authURL
AuthURL: server.AuthURL.String(),
URL: server.URL.String(),
WebURL: server.WebURL.String(),
Type: server.Type,
Expand Down
4 changes: 2 additions & 2 deletions director/director_ui_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestListServers(t *testing.T) {
expectedlistOriginRes := listServerResponse{
Name: mockOriginServerAd.Name,
BrokerURL: mockOriginServerAd.BrokerURL.String(),
AuthURL: mockOriginServerAd.URL.String(),
AuthURL: "",
URL: mockOriginServerAd.URL.String(),
WebURL: mockOriginServerAd.WebURL.String(),
Type: mockOriginServerAd.Type,
Expand All @@ -82,7 +82,7 @@ func TestListServers(t *testing.T) {
expectedlistCacheRes := listServerResponse{
Name: mockCacheServerAd.Name,
BrokerURL: mockCacheServerAd.BrokerURL.String(),
AuthURL: mockCacheServerAd.URL.String(),
AuthURL: "",
URL: mockCacheServerAd.URL.String(),
WebURL: mockCacheServerAd.WebURL.String(),
Type: mockCacheServerAd.Type,
Expand Down
67 changes: 41 additions & 26 deletions director/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,19 @@ const (
queryCancelledErr queryErrorType = "CancelledError"
)

func (e headReqTimeoutErr) Error() string {
func (e *headReqTimeoutErr) Error() string {
return e.Message
}

func (e headReqNotFoundErr) Error() string {
func (e *headReqNotFoundErr) Error() string {
return e.Message
}

func (e headReqForbiddenErr) Error() string {
func (e *headReqForbiddenErr) Error() string {
return e.Message
}

func (e headReqCancelledErr) Error() string {
func (e *headReqCancelledErr) Error() string {
return e.Message
}

Expand Down Expand Up @@ -160,7 +160,11 @@ func (q queryResult) String() string {
}
return res
} else {
return fmt.Sprintf("Query failed with error %s: %s", q.ErrorType, q.Msg)
if len(q.DeniedServers) == 0 {
return fmt.Sprintf("Query failed with error %s: %s", q.ErrorType, q.Msg)
} else {
return fmt.Sprintf("Query failed with error %s: %s %d servers require authentication to access the object", q.ErrorType, q.Msg, len(q.DeniedServers))
}
}
}

Expand Down Expand Up @@ -195,18 +199,18 @@ func (stat *ObjectStat) sendHeadReq(ctx context.Context, objectName string, data
return nil, errors.Wrap(err, "unknown request error")
} else {
if urlErr.Err == context.Canceled {
return nil, headReqCancelledErr{"request was cancelled by context"}
return nil, &headReqCancelledErr{"request was cancelled by context"}
}
if urlErr.Timeout() {
return nil, headReqTimeoutErr{fmt.Sprintf("request timeout after %dms", timeout.Milliseconds())}
return nil, &headReqTimeoutErr{fmt.Sprintf("request timeout after %dms", timeout.Milliseconds())}
}
return nil, errors.Wrap(err, "unknown request error")
}
}
if res.StatusCode == 404 {
return nil, headReqNotFoundErr{"file not found on the server " + dataUrl.String()}
return nil, &headReqNotFoundErr{"file not found on the server " + dataUrl.String()}
} else if res.StatusCode == 403 {
return nil, headReqForbiddenErr{fmt.Sprintf("authorization failed for origin %s. Token is required", dataUrl.String()), ""}
return nil, &headReqForbiddenErr{fmt.Sprintf("authorization failed for origin %s. Token is required", dataUrl.String()), ""}
} else if res.StatusCode != 200 {
resBody, err := io.ReadAll(res.Body)
if err != nil {
Expand Down Expand Up @@ -296,12 +300,12 @@ func (stat *ObjectStat) queryServersForObject(cancelContext context.Context, obj
timeout := param.Director_StatTimeout.GetDuration()
positiveReqChan := make(chan *objectMetadata)
negativeReqChan := make(chan error)
deniedReqChan := make(chan headReqForbiddenErr) // Requests with 403 response
deniedReqChan := make(chan *headReqForbiddenErr) // Requests with 403 response
// Cancel the rest of the requests when requests received >= max required
maxCancelCtx, maxCancel := context.WithCancel(context.Background())
numTotalReq := 0
successResult := make([]*objectMetadata, 0)
deniedResult := make([]headReqForbiddenErr, 0)
deniedResult := make([]*headReqForbiddenErr, 0)

if len(ads) < 1 {
maxCancel()
Expand Down Expand Up @@ -330,52 +334,63 @@ func (stat *ObjectStat) queryServersForObject(cancelContext context.Context, obj
// Use an anonymous func to pass variable safely to the goroutine
func(sAdInt server_structs.ServerAd) {
statUtil.Errgroup.Go(func() error {
activeLabels := prometheus.Labels{
"server_name": sAdInt.Name,
"server_url": sAdInt.URL.String(),
"server_type": string(sAdInt.Type),
baseUrl := sAdInt.URL

// For the topology server, if the server does not support public read,
// or the token is provided, then it's safe to assume this request goes to authenticated endpoint
// For Pelican server, we don't populate authURL and only use server URL as the base URL
if sAdInt.FromTopology && (!sAdInt.Caps.PublicReads || cfg.token != "") && sAdInt.AuthURL.String() != "" {
baseUrl = sAD.AuthURL
}
totalLabels := prometheus.Labels{

activeLabels := prometheus.Labels{
"server_name": sAdInt.Name,
"server_url": sAdInt.URL.String(),
"server_type": string(sAdInt.Type),
"result": "",
}
metrics.PelicanDirectorStatActive.With(activeLabels).Inc()
defer metrics.PelicanDirectorStatActive.With(activeLabels).Dec()

metadata, err := stat.ReqHandler(maxCancelCtx, objectName, sAdInt.URL, true, cfg.token, timeout)
metadata, err := stat.ReqHandler(maxCancelCtx, objectName, baseUrl, true, cfg.token, timeout)

if err != nil {
cancelErr := &headReqCancelledErr{}
if err != nil && !errors.As(err, &cancelErr) { // Skip additional requests if the previous one is cancelled
// If the request returns 403 or 500, it could be because we request a digest and xrootd
// either not has this turned on, or had trouble calculating the checksum
// does not have this turned on, or had trouble calculating the checksum
// For old origins/caches, it has different URLs for public VS protected data
// Retry without digest
metadata, err = stat.ReqHandler(maxCancelCtx, objectName, sAdInt.URL, false, cfg.token, timeout)
metadata, err = stat.ReqHandler(maxCancelCtx, objectName, baseUrl, false, cfg.token, timeout)
}

totalLabels := prometheus.Labels{
"server_name": sAdInt.Name,
"server_url": baseUrl.String(),
"server_type": string(sAdInt.Type),
"result": "",
}
if err != nil {
switch e := err.(type) {
case headReqTimeoutErr:
case *headReqTimeoutErr:
log.Debugf("Timeout querying %s server %s for object %s after %s: %s", sAdInt.Type, sAdInt.URL.String(), objectName, timeout.String(), e.Message)
negativeReqChan <- err
totalLabels["result"] = string(metrics.StatTimeout)
metrics.PelicanDirectorStatTotal.With(totalLabels).Inc()
return nil
case headReqNotFoundErr:
case *headReqNotFoundErr:
log.Debugf("Object %s not found at %s server %s: %s", objectName, sAdInt.Type, sAdInt.URL.String(), e.Message)
negativeReqChan <- err
totalLabels["result"] = string(metrics.StatNotFound)
metrics.PelicanDirectorStatTotal.With(totalLabels).Inc()
return nil
case headReqForbiddenErr:
fErr := err.(headReqForbiddenErr)
case *headReqForbiddenErr:
fErr := err.(*headReqForbiddenErr)
fErr.IssuerUrl = sAD.AuthURL.String()
log.Debugf("Access denied for object %s at %s server %s: %s", objectName, sAdInt.Type, sAdInt.URL.String(), e.Message)
deniedReqChan <- fErr
totalLabels["result"] = string(metrics.StatForbidden)
metrics.PelicanDirectorStatTotal.With(totalLabels).Inc()
return nil
case headReqCancelledErr:
case *headReqCancelledErr:
// Don't send to negativeReqChan as cancellation won't count towards total requests
totalLabels["result"] = string(metrics.StatCancelled)
metrics.PelicanDirectorStatTotal.With(totalLabels).Inc()
Expand Down
Loading

0 comments on commit 0686eb9

Please sign in to comment.