Skip to content

Commit

Permalink
Propogate slowdown from cosmosdb (#6556)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnyaug authored Sep 7, 2023
1 parent baab576 commit 14bb533
Show file tree
Hide file tree
Showing 24 changed files with 189 additions and 208 deletions.
2 changes: 1 addition & 1 deletion pkg/api/ui_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func handleGatewayRequest(w http.ResponseWriter, r *http.Request, gatewayDomains
err := gwerrors.Codes[gwerrors.ERRLakeFSWrongEndpoint]
err.Description = fmt.Sprintf("%s (%v)", err.Description, gatewayDomains)
o := operations.Operation{}
o.EncodeError(w, r, err)
o.EncodeError(w, r, nil, err)
}

func isGatewayRequest(r *http.Request) bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
sort.Slice(records[i], func(ii, jj int) bool {
return bytes.Compare(records[i][ii].Key, records[i][jj].Key) < 0
})
test.StagingManager.EXPECT().List(gomock.Any(), branches[i].StagingToken, gomock.Any()).AnyTimes().Return(cUtils.NewFakeValueIterator(records[i]), nil)
test.StagingManager.EXPECT().List(gomock.Any(), branches[i].StagingToken, gomock.Any()).AnyTimes().Return(cUtils.NewFakeValueIterator(records[i]))
}

if numRecords > 0 {
Expand Down
16 changes: 8 additions & 8 deletions pkg/gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func OperationHandler(sc *ServerContext, handler operations.AuthenticatedOperati
o := ctx.Value(ContextKeyOperation).(*operations.Operation)
perms, err := handler.RequiredPermissions(req)
if err != nil {
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrAccessDenied.ToAPIErr())
return
}
authOp := authorize(w, req, sc.authService, perms)
Expand All @@ -171,7 +171,7 @@ func RepoOperationHandler(sc *ServerContext, handler operations.RepoOperationHan
o := ctx.Value(ContextKeyOperation).(*operations.Operation)
perms, err := handler.RequiredPermissions(req, repo.Name)
if err != nil {
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrAccessDenied.ToAPIErr())
return
}
authOp := authorize(w, req, sc.authService, perms)
Expand Down Expand Up @@ -202,9 +202,9 @@ func PathOperationHandler(sc *ServerContext, handler operations.PathOperationHan
perms, err := handler.RequiredPermissions(req, repo.Name, refID, path)
if err != nil {
if errors.Is(err, gatewayerrors.ErrInvalidCopySource) {
_ = o.EncodeError(w, req, gatewayerrors.ErrInvalidCopySource.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrInvalidCopySource.ToAPIErr())
} else {
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrAccessDenied.ToAPIErr())
}
return
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func authorize(w http.ResponseWriter, req *http.Request, authService auth.Gatewa
user, err := auth.GetUser(ctx)
if err != nil {
o.Log(req).WithError(err).Error("failed to authorize, get user")
_ = o.EncodeError(w, req, gatewayerrors.ErrInternalError.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrInternalError.ToAPIErr())
return nil
}
username := user.Username
Expand All @@ -265,7 +265,7 @@ func authorize(w http.ResponseWriter, req *http.Request, authService auth.Gatewa
})
if err != nil {
o.Log(req).WithError(err).Error("failed to authorize")
_ = o.EncodeError(w, req, gatewayerrors.ErrInternalError.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrInternalError.ToAPIErr())
return nil
}
if authResp.Error != nil || !authResp.Allowed {
Expand All @@ -274,7 +274,7 @@ func authorize(w http.ResponseWriter, req *http.Request, authService auth.Gatewa
l = l.WithField("key", accessKeyID)
}
l.Warn("no permission")
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrAccessDenied.ToAPIErr())
return nil
}
return &operations.AuthorizedOperation{
Expand Down Expand Up @@ -315,6 +315,6 @@ func setDefaultContentType(w http.ResponseWriter, req *http.Request) {
func unsupportedOperationHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
o := &operations.Operation{}
_ = o.EncodeError(w, req, gatewayerrors.ERRLakeFSNotSupported.ToAPIErr())
_ = o.EncodeError(w, req, nil, gatewayerrors.ERRLakeFSNotSupported.ToAPIErr())
})
}
18 changes: 9 additions & 9 deletions pkg/gateway/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func AuthenticationHandler(authService auth.GatewayService, next http.Handler) h
authContext, err := authenticator.Parse(req.Context())
if err != nil {
o.Log(req).WithError(err).Warn("failed to parse signature")
_ = o.EncodeError(w, req, getAPIErrOrDefault(err, gatewayerrors.ErrAccessDenied))
_ = o.EncodeError(w, req, err, getAPIErrOrDefault(err, gatewayerrors.ErrAccessDenied))
return
}
accessKeyID := authContext.GetAccessKeyID()
Expand All @@ -48,25 +48,25 @@ func AuthenticationHandler(authService auth.GatewayService, next http.Handler) h
if err != nil {
if !errors.Is(err, auth.ErrNotFound) {
logger.WithError(err).Warn("error getting access key")
_ = o.EncodeError(w, req, gatewayerrors.ErrInternalError.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrInternalError.ToAPIErr())
} else {
logger.WithError(err).Warn("could not find access key")
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrAccessDenied.ToAPIErr())
}
return
}
err = authenticator.Verify(creds, o.FQDN)
logger = logger.WithField("authenticator", authenticator)
if err != nil {
logger.WithError(err).Warn("error verifying credentials for key")
_ = o.EncodeError(w, req, getAPIErrOrDefault(err, gatewayerrors.ErrAccessDenied))
_ = o.EncodeError(w, req, err, getAPIErrOrDefault(err, gatewayerrors.ErrAccessDenied))
return
}

user, err = authService.GetUser(ctx, creds.Username)
if err != nil {
logger.WithError(err).Warn("could not get user for credentials key")
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrAccessDenied.ToAPIErr())
return
}
ctx = logging.AddFields(ctx, logging.Fields{logging.UserFieldKey: user.Username})
Expand Down Expand Up @@ -175,18 +175,18 @@ func EnrichWithRepositoryOrFallback(c catalog.Interface, authService auth.Gatewa
},
})
if authErr != nil || authResp.Error != nil || !authResp.Allowed {
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrAccessDenied.ToAPIErr())
return
}
if fallbackProxy != nil {
fallbackProxy.ServeHTTP(w, req)
return
}
_ = o.EncodeError(w, req, gatewayerrors.ErrNoSuchBucket.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrNoSuchBucket.ToAPIErr())
return
}
if repo == nil {
_ = o.EncodeError(w, req, gatewayerrors.ErrInternalError.ToAPIErr())
_ = o.EncodeError(w, req, err, gatewayerrors.ErrInternalError.ToAPIErr())
return
}
req = req.WithContext(context.WithValue(ctx, ContextKeyRepository, repo))
Expand All @@ -204,7 +204,7 @@ func OperationLookupHandler(next http.Handler) http.Handler {
if req.Method == http.MethodGet {
o.OperationID = operations.OperationIDListBuckets
} else {
_ = o.EncodeError(w, req, gatewayerrors.ERRLakeFSNotSupported.ToAPIErr())
_ = o.EncodeError(w, req, nil, gatewayerrors.ERRLakeFSNotSupported.ToAPIErr())
return
}
} else {
Expand Down
38 changes: 26 additions & 12 deletions pkg/gateway/operations/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package operations
import (
"bytes"
"encoding/xml"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -11,9 +12,10 @@ import (
"github.com/treeverse/lakefs/pkg/auth/keys"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/catalog"
"github.com/treeverse/lakefs/pkg/gateway/errors"
gwerrors "github.com/treeverse/lakefs/pkg/gateway/errors"
"github.com/treeverse/lakefs/pkg/gateway/multipart"
"github.com/treeverse/lakefs/pkg/httputil"
"github.com/treeverse/lakefs/pkg/kv"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/permissions"
"github.com/treeverse/lakefs/pkg/upload"
Expand Down Expand Up @@ -132,20 +134,24 @@ func (o *Operation) SetHeaders(w http.ResponseWriter, headers http.Header) {
}
}

func (o *Operation) EncodeError(w http.ResponseWriter, req *http.Request, e errors.APIError) *http.Request {
func (o *Operation) EncodeError(w http.ResponseWriter, req *http.Request, originalError error, fallbackError gwerrors.APIError) *http.Request {
err := fallbackError
if errors.Is(originalError, kv.ErrSlowDown) {
err = gwerrors.ErrSlowDown.ToAPIErr()
}
req, rid := httputil.RequestID(req)
err := EncodeResponse(w, errors.APIErrorResponse{
Code: e.Code,
Message: e.Description,
writeErr := EncodeResponse(w, gwerrors.APIErrorResponse{
Code: err.Code,
Message: err.Description,
BucketName: "",
Key: "",
Resource: "",
Region: o.Region,
RequestID: rid,
HostID: generateHostID(), // just for compatibility, meaningless in our case
}, e.HTTPStatusCode)
if err != nil {
o.Log(req).WithError(err).Error("encoding response failed")
}, err.HTTPStatusCode)
if writeErr != nil {
o.Log(req).WithError(writeErr).Error("encoding response failed")
}
return req
}
Expand All @@ -166,9 +172,13 @@ type RepoOperation struct {
MatchedHost bool
}

func (o *RepoOperation) EncodeError(w http.ResponseWriter, req *http.Request, err errors.APIError) *http.Request {
func (o *RepoOperation) EncodeError(w http.ResponseWriter, req *http.Request, originalError error, fallbackError gwerrors.APIError) *http.Request {
err := fallbackError
if errors.Is(originalError, kv.ErrSlowDown) {
err = gwerrors.ErrSlowDown.ToAPIErr()
}
req, rid := httputil.RequestID(req)
writeErr := EncodeResponse(w, errors.APIErrorResponse{
writeErr := EncodeResponse(w, gwerrors.APIErrorResponse{
Code: err.Code,
Message: err.Description,
BucketName: o.Repository.Name,
Expand All @@ -194,9 +204,13 @@ type PathOperation struct {
Path string
}

func (o *PathOperation) EncodeError(w http.ResponseWriter, req *http.Request, err errors.APIError) *http.Request {
func (o *PathOperation) EncodeError(w http.ResponseWriter, req *http.Request, originalError error, fallbackError gwerrors.APIError) *http.Request {
err := fallbackError
if errors.Is(originalError, kv.ErrSlowDown) {
err = gwerrors.ErrSlowDown.ToAPIErr()
}
req, rid := httputil.RequestID(req)
writeErr := EncodeResponse(w, errors.APIErrorResponse{
writeErr := EncodeResponse(w, gwerrors.APIErrorResponse{
Code: err.Code,
Message: err.Description,
BucketName: o.Repository.Name,
Expand Down
10 changes: 5 additions & 5 deletions pkg/gateway/operations/deleteobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func (controller *DeleteObject) HandleAbortMultipartUpload(w http.ResponseWriter
mpu, err := o.MultipartTracker.Get(ctx, uploadID)
if err != nil {
o.Log(req).WithError(err).Error("upload id not found in tracker")
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchKey))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchKey))
return
}
if mpu.Path != o.Path {
o.Log(req).Error("could not match multipart upload with multipart tracker record")
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchKey))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchKey))
return
}

Expand All @@ -48,7 +48,7 @@ func (controller *DeleteObject) HandleAbortMultipartUpload(w http.ResponseWriter
}, uploadID)
if err != nil {
o.Log(req).WithError(err).Error("could not abort multipart upload")
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
return
}

Expand All @@ -75,10 +75,10 @@ func (controller *DeleteObject) Handle(w http.ResponseWriter, req *http.Request,
case errors.Is(err, graveler.ErrNotFound):
lg.WithError(err).Debug("could not delete object, it doesn't exist")
case errors.Is(err, graveler.ErrWriteToProtectedBranch):
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrWriteToProtectedBranch))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrWriteToProtectedBranch))
case err != nil:
lg.WithError(err).Error("could not delete object")
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
return
default:
lg.Debug("object set for deletion")
Expand Down
4 changes: 2 additions & 2 deletions pkg/gateway/operations/deleteobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ func (controller *DeleteObjects) Handle(w http.ResponseWriter, req *http.Request
decodedXML := &serde.Delete{}
err := DecodeXMLBody(req.Body, decodedXML)
if err != nil {
_ = o.EncodeError(w, req, gerrors.Codes.ToAPIErr(gerrors.ErrBadRequest))
_ = o.EncodeError(w, req, err, gerrors.Codes.ToAPIErr(gerrors.ErrBadRequest))
return
}
if len(decodedXML.Object) == 0 || len(decodedXML.Object) > maxDeleteObjects {
_ = o.EncodeError(w, req, gerrors.Codes.ToAPIErr(gerrors.ErrMalformedXML))
_ = o.EncodeError(w, req, err, gerrors.Codes.ToAPIErr(gerrors.ErrMalformedXML))
return
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/gateway/operations/getobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ func (controller *GetObject) Handle(w http.ResponseWriter, req *http.Request, o

if errors.Is(err, graveler.ErrNotFound) {
// TODO: create distinction between missing repo & missing key
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchKey))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchKey))
return
}
if errors.Is(err, catalog.ErrExpired) {
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchVersion))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchVersion))
return
}
if err != nil {
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
return
}
o.SetHeader(w, "Last-Modified", httputil.HeaderTimestamp(entry.CreationDate))
Expand All @@ -77,7 +77,7 @@ func (controller *GetObject) Handle(w http.ResponseWriter, req *http.Request, o
if err != nil {
o.Log(req).WithError(err).WithField("range", rangeSpec).Debug("invalid range spec")
if errors.Is(err, httputil.ErrUnsatisfiableRange) {
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInvalidRange))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInvalidRange))
return
}
}
Expand All @@ -93,7 +93,7 @@ func (controller *GetObject) Handle(w http.ResponseWriter, req *http.Request, o
Identifier: entry.PhysicalAddress,
}, entry.Size)
if err != nil {
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
return
}
} else {
Expand All @@ -106,7 +106,7 @@ func (controller *GetObject) Handle(w http.ResponseWriter, req *http.Request, o
Identifier: entry.PhysicalAddress,
}, rng.StartOffset, rng.EndOffset)
if err != nil {
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
return
}
w.WriteHeader(http.StatusPartialContent)
Expand Down
6 changes: 3 additions & 3 deletions pkg/gateway/operations/headobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ func (controller *HeadObject) Handle(w http.ResponseWriter, req *http.Request, o
if errors.Is(err, graveler.ErrNotFound) {
// TODO: create distinction between missing repo & missing key
o.Log(req).Debug("path not found")
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchKey))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchKey))
return
}
if err != nil {
o.Log(req).WithError(err).Error("failed querying path")
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
return
}
if entry.Expired {
o.Log(req).WithError(err).Info("querying expired object")
_ = o.EncodeError(w, req, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchVersion))
_ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchVersion))
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/operations/listbuckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (controller *ListBuckets) Handle(w http.ResponseWriter, req *http.Request,
// list repositories
repos, hasMore, err := o.Catalog.ListRepositories(req.Context(), -1, "", after)
if err != nil {
_ = o.EncodeError(w, req, errors.Codes.ToAPIErr(errors.ErrInternalError))
_ = o.EncodeError(w, req, err, errors.Codes.ToAPIErr(errors.ErrInternalError))
return
}

Expand Down
Loading

0 comments on commit 14bb533

Please sign in to comment.