Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#1041 from bbockelm/local_cache_errmsg
Browse files Browse the repository at this point in the history
Fix error responses for local cache
  • Loading branch information
joereuss12 authored Apr 5, 2024
2 parents 4332dbd + 99238dd commit 3dffcab
Show file tree
Hide file tree
Showing 25 changed files with 348 additions and 99 deletions.
12 changes: 12 additions & 0 deletions cache/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type (
CacheServer struct {
server_structs.NamespaceHolder
namespaceFilter map[string]struct{}
pids []int
}
)

Expand All @@ -49,6 +50,17 @@ func (server *CacheServer) CreateAdvertisement(name string, originUrl string, or
return &ad, nil
}

func (server *CacheServer) SetPids(pids []int) {
server.pids = make([]int, len(pids))
copy(server.pids, pids)
}

func (server *CacheServer) GetPids() (pids []int) {
pids = make([]int, len(server.pids))
copy(pids, server.pids)
return
}

func (server *CacheServer) SetFilters() {
/*
* Converts the list of permitted namespaces to a set and stores it for the serve
Expand Down
55 changes: 49 additions & 6 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ type (
Err error
}

// StatusCodeError is a wrapper around grab.StatusCodeErorr that indicates the server returned
// a non-200 code.
//
// The wrapper is done to provide a Pelican-based error hierarchy in case we ever decide to have
// a different underlying download package.
StatusCodeError grab.StatusCodeError

// Represents the results of a single object transfer,
// potentially across multiple attempts / retries.
TransferResults struct {
Expand Down Expand Up @@ -351,6 +358,18 @@ func (e *ConnectionSetupError) Is(target error) bool {
return ok
}

func (e *StatusCodeError) Error() string {
return (*grab.StatusCodeError)(e).Error()
}

func (e *StatusCodeError) Is(target error) bool {
sce, ok := target.(*StatusCodeError)
if !ok {
return false
}
return int(*sce) == int(*e)
}

// hasPort test the host if it includes a port
func hasPort(host string) bool {
var checkPort = regexp.MustCompile("^.*:[0-9]+$")
Expand Down Expand Up @@ -864,7 +883,7 @@ func (te *TransferEngine) runJobHandler() error {
//
// The returned object can be further customized as desired.
// This function does not "submit" the job for execution.
func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, upload bool, recursive bool, project string, options ...TransferOption) (tj *TransferJob, err error) {
func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL, localPath string, upload bool, recursive bool, project string, options ...TransferOption) (tj *TransferJob, err error) {

id, err := uuid.NewV7()
if err != nil {
Expand All @@ -891,7 +910,19 @@ func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, u
token: tc.token,
project: project,
}
tj.ctx, tj.cancel = context.WithCancel(tc.ctx)

mergeCancel := func(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) {
newCtx, cancel := context.WithCancel(ctx1)
stop := context.AfterFunc(ctx2, func() {
cancel()
})
return newCtx, func() {
stop()
cancel()
}
}

tj.ctx, tj.cancel = mergeCancel(ctx, tc.ctx)

for _, option := range options {
switch option.Ident() {
Expand Down Expand Up @@ -1438,7 +1469,7 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er
errorString += "due to proxy " + AddrString + " error: " + ope.Unwrap().Error()
} else if errors.As(err, &cse) {
errorString += "+ proxy=" + strconv.FormatBool(transferEndpoint.Proxy) + ": "
if sce, ok := cse.Unwrap().(grab.StatusCodeError); ok {
if sce, ok := cse.Unwrap().(*StatusCodeError); ok {
errorString += sce.Error()
} else {
errorString += err.Error()
Expand Down Expand Up @@ -1543,6 +1574,12 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
return 0, 0, "", errors.New("Internal error: implementation is not a http.Client type")
}
httpClient.Transport = transport
headerTimeout := transport.ResponseHeaderTimeout
if headerTimeout > time.Second {
headerTimeout -= 500 * time.Millisecond
} else {
headerTimeout /= 2
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -1578,7 +1615,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
}
// Set the headers
req.HTTPRequest.Header.Set("X-Transfer-Status", "true")
req.HTTPRequest.Header.Set("X-Pelican-Timeout", param.Transport_ResponseHeaderTimeout.GetDuration().String())
req.HTTPRequest.Header.Set("X-Pelican-Timeout", headerTimeout.Round(time.Millisecond).String())
req.HTTPRequest.Header.Set("TE", "trailers")
req.HTTPRequest.Header.Set("User-Agent", getUserAgent(project))
req = req.WithContext(ctx)
Expand All @@ -1599,11 +1636,17 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
// Check the error real quick
if resp.IsComplete() {
if err = resp.Err(); err != nil {
var sce grab.StatusCodeError
if errors.Is(err, grab.ErrBadLength) {
err = fmt.Errorf("Local copy of file is larger than remote copy %w", grab.ErrBadLength)
err = fmt.Errorf("local copy of file is larger than remote copy %w", grab.ErrBadLength)
} else if errors.As(err, &sce) {
log.Debugln("Creating a client status code error")
sce2 := StatusCodeError(sce)
err = &sce2
} else {
err = &ConnectionSetupError{Err: err}
}
log.Errorln("Failed to download:", err)
err = &ConnectionSetupError{Err: err}
return
}
}
Expand Down
4 changes: 3 additions & 1 deletion client/handle_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,9 @@ func TestSortAttempts(t *testing.T) {

func TestTimeoutHeaderSetForDownload(t *testing.T) {
viper.Reset()
config.InitConfig()
viper.Set("Transport.ResponseHeaderTimeout", 10*time.Second)
require.NoError(t, config.InitClient())
ctx, _, _ := test_utils.TestContext(context.Background(), t)

// We have this flag because our server will get a few requests throughout its lifetime and the other
Expand All @@ -470,7 +472,7 @@ func TestTimeoutHeaderSetForDownload(t *testing.T) {
if r.Header.Get("X-Pelican-Timeout") == "" {
t.Error("X-Pelican-Timeout header is not set")
}
assert.Equal(t, "10s", r.Header.Get("X-Pelican-Timeout"))
assert.Equal(t, "9.5s", r.Header.Get("X-Pelican-Timeout"))
timeoutHeaderFound = true
}
}))
Expand Down
6 changes: 3 additions & 3 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func DoPut(ctx context.Context, localObject string, remoteDestination string, re
if err != nil {
return
}
tj, err := client.NewTransferJob(remoteDestUrl, localObject, true, recursive, project)
tj, err := client.NewTransferJob(context.Background(), remoteDestUrl, localObject, true, recursive, project)
if err != nil {
return
}
Expand Down Expand Up @@ -559,7 +559,7 @@ func DoGet(ctx context.Context, remoteObject string, localDestination string, re
if err != nil {
return
}
tj, err := tc.NewTransferJob(remoteObjectUrl, localDestination, false, recursive, project)
tj, err := tc.NewTransferJob(context.Background(), remoteObjectUrl, localDestination, false, recursive, project)
if err != nil {
return
}
Expand Down Expand Up @@ -694,7 +694,7 @@ func DoCopy(ctx context.Context, sourceFile string, destination string, recursiv
if err != nil {
return
}
tj, err := tc.NewTransferJob(remoteURL, localPath, isPut, recursive, project)
tj, err := tc.NewTransferJob(context.Background(), remoteURL, localPath, isPut, recursive, project)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func serveCache(cmd *cobra.Command, _ []string) error {
cancel, err := launchers.LaunchModules(cmd.Context(), config.CacheType)
_, cancel, err := launchers.LaunchModules(cmd.Context(), config.CacheType)
if err != nil {
cancel()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/director_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func serveDirector(cmd *cobra.Command, args []string) error {
modules.Set(config.BrokerType)
}

cancel, err := launchers.LaunchModules(cmd.Context(), config.DirectorType)
_, cancel, err := launchers.LaunchModules(cmd.Context(), config.DirectorType)
if err != nil {
cancel()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/fed_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func fedServeStart(cmd *cobra.Command, args []string) error {
}
}

cancel, err := launchers.LaunchModules(cmd.Context(), modules)
_, cancel, err := launchers.LaunchModules(cmd.Context(), modules)
if err != nil {
cancel()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/fed_serve_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestFedServeCache(t *testing.T) {

require.NoError(t, err)

fedCancel, err := launchers.LaunchModules(ctx, modules)
_, fedCancel, err := launchers.LaunchModules(ctx, modules)
defer fedCancel()
if err != nil {
log.Errorln("Failure in fedServeInternal:", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/fed_serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestFedServePosixOrigin(t *testing.T) {
viper.Set("Registry.RequireCacheApproval", false)
defer cancel()

fedCancel, err := launchers.LaunchModules(ctx, modules)
_, fedCancel, err := launchers.LaunchModules(ctx, modules)

defer fedCancel()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func serveOrigin(cmd *cobra.Command, args []string) error {
cancel, err := launchers.LaunchModules(cmd.Context(), config.OriginType)
_, cancel, err := launchers.LaunchModules(cmd.Context(), config.OriginType)
if err != nil {
cancel()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func runPluginWorker(ctx context.Context, upload bool, workChan <-chan PluginTra
var tj *client.TransferJob
urlCopy := *transfer.url
project := client.GetProjectName()
tj, err = tc.NewTransferJob(&urlCopy, transfer.localFile, upload, false, project, client.WithAcquireToken(false), client.WithCaches(caches...))
tj, err = tc.NewTransferJob(context.Background(), &urlCopy, transfer.localFile, upload, false, project, client.WithAcquireToken(false), client.WithCaches(caches...))
if err != nil {
return errors.Wrap(err, "Failed to create new transfer job")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (f *FedTest) Spinup() {
viper.Set("Registry.RequireOriginApproval", false)
viper.Set("Registry.RequireCacheApproval", false)

f.FedCancel, err = launchers.LaunchModules(ctx, modules)
_, f.FedCancel, err = launchers.LaunchModules(ctx, modules)
if err != nil {
f.T.Fatalf("Failure in fedServeInternal: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/registry_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func serveRegistry(cmd *cobra.Command, _ []string) error {
cancel, err := launchers.LaunchModules(cmd.Context(), config.RegistryType)
_, cancel, err := launchers.LaunchModules(cmd.Context(), config.RegistryType)
if err != nil {
cancel()
}
Expand Down
14 changes: 9 additions & 5 deletions daemon/launch_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,24 @@ func (launcher DaemonLauncher) Launch(ctx context.Context) (context.Context, int
return ctx_result, cmd.Process.Pid, nil
}

func LaunchDaemons(ctx context.Context, launchers []Launcher, egrp *errgroup.Group) (err error) {
func LaunchDaemons(ctx context.Context, launchers []Launcher, egrp *errgroup.Group) (pids []int, err error) {

daemons := make([]launchInfo, len(launchers))
pids = make([]int, len(launchers))
for idx, daemon := range launchers {
ctx, pid, err := daemon.Launch(ctx)
var newCtx context.Context
var pid int
newCtx, pid, err = daemon.Launch(ctx)
if err != nil {
err = errors.Wrapf(err, "Failed to launch %s daemon", daemon.Name())
// This is secure as long as deamon.Name() is either "xrootd" or "cmsd"
metrics.SetComponentHealthStatus(metrics.HealthStatusComponent(daemon.Name()), metrics.StatusCritical, err.Error())
return err
return
}
daemons[idx].ctx = ctx
daemons[idx].ctx = newCtx
daemons[idx].pid = pid
daemons[idx].name = daemon.Name()
pids[idx] = pid
log.Infoln("Successfully launched", daemon.Name())
metrics.SetComponentHealthStatus(metrics.HealthStatusComponent(daemon.Name()), metrics.StatusOK, "")
}
Expand Down Expand Up @@ -257,5 +261,5 @@ func LaunchDaemons(ctx context.Context, launchers []Launcher, egrp *errgroup.Gro
}
})

return nil
return
}
8 changes: 7 additions & 1 deletion fed_test_utils/fed.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type (
Token string
Ctx context.Context
Egrp *errgroup.Group
Pids []int
}
)

Expand Down Expand Up @@ -139,9 +140,14 @@ func NewFedTest(t *testing.T, originConfig string) (ft *FedTest) {
err = config.InitServer(ctx, modules)
require.NoError(t, err)

_, err = launchers.LaunchModules(ctx, modules)
servers, _, err := launchers.LaunchModules(ctx, modules)
require.NoError(t, err)

ft.Pids = make([]int, 0, 2)
for _, server := range servers {
ft.Pids = append(ft.Pids, server.GetPids()...)
}

desiredURL := param.Server_ExternalWebUrl.GetString() + "/api/v1.0/health"
err = server_utils.WaitUntilWorking(ctx, "GET", desiredURL, "director", 200)
require.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions launchers/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m
return nil, err
}

if err = daemon.LaunchDaemons(ctx, launchers, egrp); err != nil {
pids, err := daemon.LaunchDaemons(ctx, launchers, egrp)
if err != nil {
return nil, err
}

cacheServer.SetPids(pids)
return cacheServer, nil
}

Expand Down
Loading

0 comments on commit 3dffcab

Please sign in to comment.