Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#1063 from bbockelm/record_attempts
Browse files Browse the repository at this point in the history
Correctly return the transfer attempt information
  • Loading branch information
joereuss12 authored Apr 8, 2024
2 parents ec26ee0 + 5c6c3b4 commit 722b371
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 99 deletions.
6 changes: 5 additions & 1 deletion client/errorAccum.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ func NewTransferErrors() *TransferErrors {
}

func (te *TransferErrors) AddError(err error) {
te.AddPastError(err, time.Now())
}

func (te *TransferErrors) AddPastError(err error, timestamp time.Time) {
if te.errors == nil {
te.errors = make([]error, 0)
}
if err != nil {
te.errors = append(te.errors, &TimestampedError{err: err, timestamp: time.Now()})
te.errors = append(te.errors, &TimestampedError{err: err, timestamp: timestamp})
}
}

Expand Down
4 changes: 2 additions & 2 deletions client/fed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,14 @@ func TestCopyAuth(t *testing.T) {
transferResultsUpload, err := client.DoCopy(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name()))
assert.NoError(t, err)
if err == nil {
assert.Equal(t, transferResultsUpload[0].TransferredBytes, int64(17))
assert.Equal(t, int64(17), transferResultsUpload[0].TransferredBytes)
}

// Download that same file with GET
transferResultsDownload, err := client.DoCopy(fed.Ctx, uploadURL, t.TempDir(), false, client.WithTokenLocation(tempToken.Name()))
assert.NoError(t, err)
if err == nil {
assert.Equal(t, transferResultsDownload[0].TransferredBytes, transferResultsUpload[0].TransferredBytes)
assert.Equal(t, int64(17), transferResultsDownload[0].TransferredBytes)
}
}
})
Expand Down
203 changes: 121 additions & 82 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ type (
Err error
}

// Transfer attempt error wraps an error with information about the service/proxy used
TransferAttemptError struct {
serviceHost string
proxyHost string
isUpload bool
isProxyErr bool
err error
}

// StatusCodeError is a wrapper around grab.StatusCodeErorr that indicates the server returned
// a non-200 code.
//
Expand All @@ -131,14 +140,14 @@ type (
}

TransferResult struct {
Number int // indicates which attempt this is
TransferFileBytes int64 // how much each attempt downloaded
TimeToFirstByte float64 // how long it took to download the first byte
TransferEndTime int64 // when the transfer ends
TransferTime int64 // amount of time we were transferring per attempt (in seconds)
Endpoint string // which origin did it use
ServerVersion string // version of the server
Error error // what error the attempt returned (if any)
Number int // indicates which attempt this is
TransferFileBytes int64 // how much each attempt downloaded
TimeToFirstByte time.Duration // how long it took to download the first byte
TransferEndTime time.Time // when the transfer ends
TransferTime time.Duration // amount of time we were transferring per attempt (in seconds)
Endpoint string // which origin did it use
ServerVersion string // version of the server
Error error // what error the attempt returned (if any)
}

clientTransferResults struct {
Expand Down Expand Up @@ -370,6 +379,54 @@ func (e *StatusCodeError) Is(target error) bool {
return int(*sce) == int(*e)
}

func (tae *TransferAttemptError) Error() (errMsg string) {
errMsg = "failed download from "
if tae.isUpload {
errMsg = "failed upload to "
}
if tae.serviceHost == "" {
errMsg += "unknown host"
} else {
errMsg += tae.serviceHost
}
if tae.isProxyErr {
if tae.proxyHost == "" {
errMsg += " due to unknown proxy"
} else {
errMsg += " due to proxy " + tae.proxyHost
}
} else if tae.proxyHost != "" {
errMsg += "+proxy=" + tae.proxyHost
}
if tae.err != nil {
errMsg += ": " + tae.err.Error()
}
return
}

func (tae *TransferAttemptError) Unwrap() error {
return tae.err
}

func (tae *TransferAttemptError) Is(target error) bool {
other, ok := target.(*TransferAttemptError)
if !ok {
return false
}
return tae.isUpload == other.isUpload && tae.serviceHost == other.serviceHost && tae.isProxyErr == other.isProxyErr && tae.proxyHost == other.proxyHost
}

func newTransferAttemptError(service string, proxy string, isProxyErr bool, isUpload bool, err error) (tae *TransferAttemptError) {
tae = &TransferAttemptError{
serviceHost: service,
proxyHost: proxy,
isProxyErr: isProxyErr,
isUpload: isUpload,
err: err,
}
return
}

// hasPort test the host if it includes a port
func hasPort(host string) bool {
var checkPort = regexp.MustCompile("^.*:[0-9]+$")
Expand Down Expand Up @@ -1302,19 +1359,6 @@ func runTransferWorker(ctx context.Context, workChan <-chan *clientTransferFile,
transferResults = newTransferResults(file.file.job)
transferResults.Scheme = file.file.remoteURL.Scheme
transferResults.Error = err
} else if transferResults.Error == nil {
xferErrors := NewTransferErrors()
lastXferGood := false
for _, attempt := range transferResults.Attempts {
if attempt.Error == nil {
lastXferGood = true
} else {
xferErrors.AddError(attempt.Error)
}
}
if !lastXferGood {
transferResults.Error = xferErrors
}
}
results <- &clientTransferResults{id: file.uuid, results: transferResults}
}
Expand Down Expand Up @@ -1426,6 +1470,11 @@ func sortAttempts(ctx context.Context, path string, attempts []transferAttemptDe
return
}

// Download the object specified in the transfer to the local filesystem
//
// transferResults contains the summary of the multiple attempts.
// err is set _only_ if the function was unable to attempt a transfer (e.g., unable to
// create the destination directory).
func downloadObject(transfer *transferFile) (transferResults TransferResults, err error) {
log.Debugln("Downloading object from", transfer.remoteURL, "to", transfer.localPath)
// Remove the source from the file path
Expand All @@ -1438,11 +1487,10 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er
size, attempts := sortAttempts(transfer.job.ctx, transfer.remoteURL.Path, transfer.attempts)

transferResults = newTransferResults(transfer.job)
xferErrors := NewTransferErrors()
success := false
for idx, transferEndpoint := range attempts { // For each transfer attempt (usually 3), try to download via HTTP
var attempt TransferResult
var timeToFirstByte float64
var serverVersion string
attempt.Number = idx // Start with 0
attempt.Endpoint = transferEndpoint.Url.Host
// Make a copy of the transfer endpoint URL; otherwise, when we mutate the pointer, other parallel
Expand All @@ -1451,57 +1499,56 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er
transferEndpointUrl.Path = transfer.remoteURL.Path
transferEndpoint.Url = &transferEndpointUrl
transferStartTime := time.Now()
if downloaded, timeToFirstByte, serverVersion, err = downloadHTTP(transfer.ctx, transfer.engine, transfer.callback, transferEndpoint, transfer.localPath, size, transfer.token, transfer.project); err != nil {
log.Debugln("Failed to download:", err)
transferEndTime := time.Now()
transferTime := transferEndTime.Unix() - transferStartTime.Unix()
attempt.TransferTime = transferTime
attemptDownloaded, timeToFirstByte, serverVersion, err := downloadHTTP(
transfer.ctx, transfer.engine, transfer.callback, transferEndpoint, transfer.localPath, size, transfer.token, transfer.project,
)
endTime := time.Now()
attempt.TransferEndTime = endTime
attempt.TransferTime = endTime.Sub(transferStartTime)
attempt.ServerVersion = serverVersion
attempt.TransferFileBytes = attemptDownloaded
attempt.TimeToFirstByte = timeToFirstByte
downloaded += attemptDownloaded

if err != nil {
log.Debugln("Failed to download from", transferEndpoint.Url, ":", err)
var ope *net.OpError
var cse *ConnectionSetupError
errorString := "Failed to download from " + transferEndpoint.Url.Hostname() + ":" +
transferEndpoint.Url.Port() + " "
proxyStr, _ := os.LookupEnv("http_proxy")
if !transferEndpoint.Proxy {
proxyStr = ""
}
serviceStr := attempt.Endpoint
if transferEndpointUrl.Scheme == "unix" {
serviceStr = "local-cache"
}
if errors.As(err, &ope) && ope.Op == "proxyconnect" {
log.Debugln(ope)
AddrString, _ := os.LookupEnv("http_proxy")
if ope.Addr != nil {
AddrString = " " + ope.Addr.String()
proxyStr += "(" + ope.Addr.String() + ")"
}
errorString += "due to proxy " + AddrString + " error: " + ope.Unwrap().Error()
attempt.Error = newTransferAttemptError(serviceStr, proxyStr, true, false, err)
} else if errors.As(err, &cse) {
errorString += "+ proxy=" + strconv.FormatBool(transferEndpoint.Proxy) + ": "
if sce, ok := cse.Unwrap().(*StatusCodeError); ok {
errorString += sce.Error()
attempt.Error = newTransferAttemptError(serviceStr, proxyStr, false, false, sce)
} else {
errorString += err.Error()
attempt.Error = newTransferAttemptError(serviceStr, proxyStr, false, false, err)
}
} else {
errorString += "+ proxy=" + strconv.FormatBool(transferEndpoint.Proxy) +
": " + err.Error()
attempt.Error = newTransferAttemptError(serviceStr, proxyStr, false, false, err)
}
attempt.TransferFileBytes = downloaded
attempt.TimeToFirstByte = timeToFirstByte
attempt.Error = errors.New(errorString)
attempt.TransferEndTime = transferEndTime.Unix()
attempt.ServerVersion = serverVersion
transferResults.Attempts = append(transferResults.Attempts, attempt)
} else { // Success
transferEndTime := time.Now()
transferTime := transferEndTime.Unix() - transferStartTime.Unix()
attempt.TransferEndTime = transferEndTime.Unix()
attempt.TransferTime = transferTime
attempt.TimeToFirstByte = timeToFirstByte
attempt.TransferFileBytes = downloaded
attempt.ServerVersion = serverVersion
xferErrors.AddPastError(attempt.Error, endTime)
}
transferResults.Attempts = append(transferResults.Attempts, attempt)

if err == nil { // Success
log.Debugln("Downloaded bytes:", downloaded)
transferResults.Attempts = append(transferResults.Attempts, attempt)
success = true
break
}
}
transferResults.TransferredBytes = downloaded
if !success {
log.Debugln("Failed to download with HTTP")
transferResults.Error = errors.New("failed to download with HTTP")
transferResults.Error = xferErrors
}
return
}
Expand All @@ -1523,7 +1570,7 @@ func parseTransferStatus(status string) (int, string) {
// Perform the actual download of the file
//
// Returns the downloaded size, time to 1st byte downloaded, serverVersion and an error if there is one
func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCallbackFunc, transfer transferAttemptDetails, dest string, totalSize int64, token string, project string) (downloaded int64, timeToFirstByte float64, serverVersion string, err error) {
func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCallbackFunc, transfer transferAttemptDetails, dest string, totalSize int64, token string, project string) (downloaded int64, timeToFirstByte time.Duration, serverVersion string, err error) {
defer func() {
if r := recover(); r != nil {
log.Errorln("Panic occurred in downloadHTTP:", r)
Expand Down Expand Up @@ -1690,10 +1737,8 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
Loop:
for {
if !timeToFirstByteRecorded && resp.BytesComplete() > 1 {
// We get the time since in nanoseconds:
timeToFirstByteNs := time.Since(downloadStart)
// Convert to seconds
timeToFirstByte = float64(timeToFirstByteNs.Round(time.Millisecond).Milliseconds()) / 1000.0
timeToFirstByte = time.Since(downloadStart)
timeToFirstByteRecorded = true
}
select {
Expand Down Expand Up @@ -1872,6 +1917,7 @@ func (pr *ProgressReader) Size() int64 {
// Upload a single object to the origin
func uploadObject(transfer *transferFile) (transferResult TransferResults, err error) {
log.Debugln("Uploading file to destination", transfer.remoteURL)
xferErrors := NewTransferErrors()
transferResult.job = transfer.job

var sizer Sizer = &ConstantSizer{size: 0}
Expand Down Expand Up @@ -1979,10 +2025,8 @@ func uploadObject(transfer *transferFile) (transferResult TransferResults, err e
Loop:
for {
if !firstByteRecorded && reader.BytesComplete() > 1 {
// We get the time since in nanoseconds:
timeToFirstByteNs := time.Since(uploadStart)
// Convert to seconds
attempt.TimeToFirstByte = float64(timeToFirstByteNs.Round(time.Millisecond).Milliseconds()) / 1000.0
attempt.TimeToFirstByte = time.Since(uploadStart)
firstByteRecorded = true
}
select {
Expand Down Expand Up @@ -2027,27 +2071,22 @@ Loop:
}
}

transferEndTime := time.Now()
if fileInfo.Size() == 0 {
transferResult.Error = lastError
transferEndTime := time.Now()
attempt.TransferEndTime = transferEndTime.Unix()
attempt.TransferTime = attempt.TransferEndTime - transferStartTime.Unix()

// Add our attempt fields
transferResult.Attempts = append(transferResult.Attempts, attempt)
return transferResult, lastError
xferErrors.AddPastError(newTransferAttemptError(dest.Host, "", false, true, lastError), transferEndTime)
transferResult.Error = xferErrors
attempt.Error = lastError
} else {
log.Debugln("Uploaded bytes:", reader.BytesComplete())
transferResult.TransferredBytes = reader.BytesComplete()
transferEndTime := time.Now()
attempt.TransferEndTime = transferEndTime.Unix()
attempt.TransferTime = attempt.TransferEndTime - transferStartTime.Unix()

// Add our attempt fields
transferResult.Attempts = append(transferResult.Attempts, attempt)
return transferResult, lastError
}

bytesComplete := reader.BytesComplete()
log.Debugln("Uploaded bytes:", bytesComplete)
transferResult.TransferredBytes = bytesComplete
attempt.TransferFileBytes = bytesComplete
}
// Add our attempt fields
attempt.TransferEndTime = transferEndTime
attempt.TransferTime = transferEndTime.Sub(transferStartTime)
transferResult.Attempts = append(transferResult.Attempts, attempt)
return transferResult, lastError
}

// Actually perform the HTTP PUT request to the server.
Expand Down
17 changes: 17 additions & 0 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,23 @@ func DoGet(ctx context.Context, remoteObject string, localDestination string, re
}

if !success {
// If there's only a single transfer error, remove the wrapping to provide
// a simpler error message. Results in:
// failed download from local-cache: server returned 404 Not Found
// versus:
// failed to download file: transfer error: failed download from local-cache: server returned 404 Not Found
var te *TransferErrors
if errors.As(err, &te) {
if len(te.Unwrap()) == 1 {
var tae *TransferAttemptError
if errors.As(te.Unwrap()[0], &tae) {
return nil, tae
} else {
return nil, errors.Wrap(err, "failed to download file")
}
}
return nil, te
}
return nil, errors.Wrap(err, "failed to download file")
} else {
return transferResults, err
Expand Down
Loading

0 comments on commit 722b371

Please sign in to comment.