Skip to content

Commit

Permalink
use gaul/s3proxy (#1113)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnyaug authored Dec 28, 2020
1 parent 07e7b31 commit 8731b60
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 7 deletions.
11 changes: 10 additions & 1 deletion cmd/lakefs/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -126,6 +127,14 @@ var runCmd = &cobra.Command{
)

// init gateway server
s3Fallback := cfg.GetS3GatewayFallbackURL()
var s3FallbackURL *url.URL
if s3Fallback != "" {
s3FallbackURL, err = url.Parse(s3Fallback)
if err != nil {
logger.WithError(err).Fatal("Failed to parse s3 fallback URL")
}
}
s3gatewayHandler := gateway.NewHandler(
cfg.GetS3GatewayRegion(),
cataloger,
Expand All @@ -135,8 +144,8 @@ var runCmd = &cobra.Command{
cfg.GetS3GatewayDomainName(),
bufferedCollector,
dedupCleaner,
s3FallbackURL,
)

ctx, cancelFn := context.WithCancel(context.Background())
go bufferedCollector.Run(ctx)

Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ func (c *Config) GetS3GatewayDomainName() string {
return viper.GetString("gateways.s3.domain_name")
}

func (c *Config) GetS3GatewayFallbackURL() string {
return viper.GetString("gateways.s3.fallback_url")
}

func (c *Config) GetListenAddress() string {
return viper.GetString("listen_address")
}
Expand Down
1 change: 1 addition & 0 deletions docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ This reference uses `.` to denote the nesting of values.
(`*.s3.local.lakefs.io` always resolves to 127.0.0.1, useful for
local development
* `gateways.s3.region` `(string : "us-east-1")` - AWS region we're pretending to be. Should match the region configuration used in AWS SDK clients
* `gateways.s3.fallback_url` `(string)` - If specified, requests with a non-existing repository will be forwarded to this url.
* `stats.enabled` `(boolean : true)` - Whether or not to periodically collect anonymous usage statistics
{: .ref-list }

Expand Down
29 changes: 23 additions & 6 deletions gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"net/http"
gohttputil "net/http/httputil"
"net/url"
"reflect"
"regexp"
"strconv"
Expand Down Expand Up @@ -47,6 +49,7 @@ type ServerContext struct {
authService simulator.GatewayAuthService
stats stats.Collector
dedupCleaner *dedup.Cleaner
fallbackProxy *gohttputil.ReverseProxy
}

const operationIDNotFound = "not_found_operation"
Expand All @@ -62,6 +65,7 @@ func (c *ServerContext) WithContext(ctx context.Context) *ServerContext {
authService: c.authService,
stats: c.stats,
dedupCleaner: c.dedupCleaner,
fallbackProxy: c.fallbackProxy,
}
}

Expand All @@ -74,7 +78,12 @@ func NewHandler(
bareDomain string,
stats stats.Collector,
dedupCleaner *dedup.Cleaner,
fallbackURL *url.URL,
) http.Handler {
var fallbackProxy *gohttputil.ReverseProxy
if fallbackURL != nil {
fallbackProxy = gohttputil.NewSingleHostReverseProxy(fallbackURL)
}
sc := &ServerContext{
ctx: context.Background(),
cataloger: cataloger,
Expand All @@ -85,6 +94,7 @@ func NewHandler(
authService: authService,
stats: stats,
dedupCleaner: dedupCleaner,
fallbackProxy: fallbackProxy,
}

// setup routes
Expand Down Expand Up @@ -268,8 +278,12 @@ func RepoOperationHandler(sc *ServerContext, repoID string, handler operations.R
// validate repo exists
repo, err := authOp.Cataloger.GetRepository(sc.ctx, repoID)
if errors.Is(err, db.ErrNotFound) {
authOp.Log().WithField("repository", repoID).Warn("the specified repo does not exist")
authOp.EncodeError(gatewayerrors.ErrNoSuchBucket.ToAPIErr())
if sc.fallbackProxy == nil {
authOp.Log().WithField("repository", repoID).Warn("the specified repo does not exist")
authOp.EncodeError(gatewayerrors.ErrNoSuchBucket.ToAPIErr())
} else {
sc.fallbackProxy.ServeHTTP(writer, request)
}
return
}
if err != nil {
Expand Down Expand Up @@ -305,15 +319,18 @@ func PathOperationHandler(sc *ServerContext, repoID, refID, path string, handler
// validate repo exists
repo, err := authOp.Cataloger.GetRepository(sc.ctx, repoID)
if errors.Is(err, db.ErrNotFound) {
authOp.Log().WithField("repository", repoID).Warn("the specified repo does not exist")
authOp.EncodeError(gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchBucket))
if sc.fallbackProxy == nil {
authOp.Log().WithField("repository", repoID).Warn("the specified repo does not exist")
authOp.EncodeError(gatewayerrors.ErrNoSuchBucket.ToAPIErr())
} else {
sc.fallbackProxy.ServeHTTP(writer, request)
}
return
}
if err != nil {
authOp.EncodeError(gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError))
authOp.EncodeError(gatewayerrors.ErrInternalError.ToAPIErr())
return
}

// run callback
operation := &operations.PathOperation{
RefOperation: &operations.RefOperation{
Expand Down
1 change: 1 addition & 0 deletions gateway/playback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func getBasicHandler(t *testing.T, authService *simulator.PlayBackMockConf) (htt
authService.BareDomain,
&mockCollector{},
dedupCleaner,
nil,
)

return handler, &dependencies{
Expand Down

0 comments on commit 8731b60

Please sign in to comment.