Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Athens: introduce Single Flight #573

Merged
merged 16 commits into from
Sep 3, 2018
23 changes: 18 additions & 5 deletions cmd/olympus/actions/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/gomods/athens/pkg/cdn/metadata/azurecdn"
"github.com/gomods/athens/pkg/config/env"
"github.com/gomods/athens/pkg/download"
"github.com/gomods/athens/pkg/download/goget"
"github.com/gomods/athens/pkg/eventlog"
"github.com/gomods/athens/pkg/log"
"github.com/gomods/athens/pkg/module"
"github.com/gomods/athens/pkg/stash"
"github.com/gomods/athens/pkg/storage"
"github.com/gomodule/redigo/redis"
"github.com/rs/cors"
"github.com/spf13/afero"
"github.com/unrolled/secure"
)

Expand Down Expand Up @@ -85,6 +87,7 @@ func App(config *AppConfig) (*buffalo.App, error) {
WorkerOff: true, // TODO(marwan): turned off until worker is being used.
Logger: blggr,
})

// Automatically redirect to SSL
app.Use(ssl.ForceSSL(secure.Options{
SSLRedirect: ENV == "production",
Expand Down Expand Up @@ -123,13 +126,23 @@ func App(config *AppConfig) (*buffalo.App, error) {
app.GET("/healthz", healthHandler)

// Download Protocol
gg, err := goget.New()
goBin := env.GoBinPath()
fs := afero.NewOsFs()
mf, err := module.NewGoGetFetcher(goBin, fs)
if err != nil {
return nil, err
}
dp := download.New(gg, config.Storage, env.GoGetWorkers())
opts := &download.HandlerOpts{Protocol: dp, Logger: lggr, Engine: renderEng}
download.RegisterHandlers(app, opts)
st := stash.New(mf, config.Storage)
dpOpts := &download.Opts{
Storage: config.Storage,
Stasher: st,
GoBinPath: goBin,
Fs: fs,
}
dp := download.New(dpOpts)

handlerOpts := &download.HandlerOpts{Protocol: dp, Logger: lggr, Engine: renderEng}
download.RegisterHandlers(app, handlerOpts)

app.ServeFiles("/", assetsBox) // serve files from the public directory

Expand Down
45 changes: 40 additions & 5 deletions cmd/proxy/actions/app_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"github.com/gobuffalo/buffalo"
"github.com/gomods/athens/pkg/config/env"
"github.com/gomods/athens/pkg/download"
"github.com/gomods/athens/pkg/download/goget"
"github.com/gomods/athens/pkg/download/addons"
"github.com/gomods/athens/pkg/log"
"github.com/gomods/athens/pkg/module"
"github.com/gomods/athens/pkg/stash"
"github.com/gomods/athens/pkg/storage"
"github.com/spf13/afero"
)

func addProxyRoutes(
Expand All @@ -18,13 +21,45 @@ func addProxyRoutes(
app.GET("/healthz", healthHandler)

// Download Protocol
gg, err := goget.New()
// the download.Protocol and the stash.Stasher interfaces are composable
// in a middleware fashion. Therefore you can separate concerns
// by the functionality: a download.Protocol that just takes care
// of "go getting" things, and another Protocol that just takes care
// of "pooling" requests etc.

// In our case, we'd like to compose both interfaces in a particular
// order to ensure logical ordering of execution.

// Here's the order of an incoming request to the download.Protocol:

// 1. The downloadpool gets hit first, and manages concurrent requests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it overly complicated, do you find it readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michalpristas I 100% agree that it's more complex. I also agree that it's less readable than what we currently have, but that comes with the complexity part.

My first option was just to lump all of this code into the download.go file and that would have been much much harder to read because you'd have *two worker pools and a singleflight all under the same struct. I don't mind returning to that option if you want? Otherwise, let me know if you have other ideas, totally happy to change things :)

Here's what's required:

  1. 2 pools (one for download.Protocol, one for go mod download)
  2. one singleflight for the operation of "go mod download" and "storage.Save"

Copy link
Member

@michalpristas michalpristas Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagined something simpler e.g (and I am totally not thinking this through, just writing an idea as it goes)

  • the request comes to get/list/whatever - no handling of concurrency just let it come/fail
  • from request, you call X to spin up worker
  • X returns a channel C which will be closed once the module is prepared in Store
  • X checks store for module.v, closes C if it does exist
  • X checks whether work for module.v is already queued, if c,ok := map[module.v]; ok; return c
  • if not it creates a job with a channel passed into goroutine and stored in a local map[module.v]chan. Returns chan
  • spinned up job performs go get, storage.Save, close(C)
  • request waits on <-C, and then serves stored module

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michalpristas

  1. The request does come to the handler and the handler doesn't know anything about underlying concurrency, including all the download protocols that are not concurrent (i.e. goget)
  2. What is X, is it all 3 requirements together?
  3. From the rest of those points, there's a missing requirement which is separating the download.Protocol Pool and the goget/storage.Save Pool. How would you split those up?
  4. I'm kind of doing everything you're saying, but more separated out than all in one file: X. I'm happy to put them all in X if you'd like :)

Copy link
Member

@michalpristas michalpristas Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dismissed my blocking review. if folks are ok with this design I will not block and will think this through if needed once I'm mentally capable of thinking.
I'm really sorry for the confusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michalpristas no I 100% agree with you. I think we shouldn't rush this PR since it adds a lot of complexity. I'll mark this as do not merge until we have a bunch of people weighing in and maybe proposing different approaches. One idea is that I can have some of the Athens peeps at GopherCon doing a group review and brainstorm some other stuff and bringing it back here. I even don't mind opening another PR (or someone else opening another PR) that does the same thing differently so we can really compare. Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey if you manage to get people together and brainstorm it would be great ;-)

// 2. The downloadpool passes the request to its parent Protocol: stasher
// 3. The stasher Protocol checks storage first, and if storage is empty
// it makes a Stash request to the stash.Stasher interface.

// Once the stasher picks up an order, here's how the requests go in order:
// 1. The singleflight picks up the first request and latches duplicate ones.
// 2. The singleflight passes the stash to its parent: stashpool.
// 3. The stashpool manages limiting concurrent requests and passes them to stash.
// 4. The plain stash.New just takes a request from upstream and saves it into storage.
goBin := env.GoBinPath()
fs := afero.NewOsFs()
mf, err := module.NewGoGetFetcher(goBin, fs)
if err != nil {
return err
}
p := download.New(gg, s, env.GoGetWorkers())
opts := &download.HandlerOpts{Protocol: p, Logger: l, Engine: proxy}
download.RegisterHandlers(app, opts)
st := stash.New(mf, s, stash.WithPool(env.GoGetWorkers()), stash.WithSingleflight)

dpOpts := &download.Opts{
Storage: s,
Stasher: st,
GoBinPath: goBin,
Fs: fs,
}
dp := download.New(dpOpts, addons.WithPool(env.ProtocolWorkers()))

handlerOpts := &download.HandlerOpts{Protocol: dp, Logger: l, Engine: proxy}
download.RegisterHandlers(app, handlerOpts)

return nil
}
21 changes: 21 additions & 0 deletions pkg/config/env/go.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,24 @@ func GoGetWorkers() int {

return num
}

// ProtocolWorkers returns how many concurrent
// requests can you handle at a time for all
// download protocol paths. This is different from
// GoGetWorkers in that you can potentially serve
// 30 requests to the Download Protocol but only 5
// at a time can stash a module from Upstream to Storage.
func ProtocolWorkers() int {
defaultNum := 30
str := os.Getenv("ATHENS_PROTOCOL_WORKERS")
if str == "" {
return defaultNum
}

num, err := strconv.Atoi(str)
if err != nil {
return defaultNum
}

return num
}
129 changes: 129 additions & 0 deletions pkg/download/addons/with_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package addons

import (
"context"
"io"

"github.com/gomods/athens/pkg/download"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/storage"
)

type withpool struct {
dp download.Protocol

// jobCh is a channel that takes an anonymous
// function that it executes based on the pool's
// business. The design levarages closures
// so that the worker does not need to worry about
// what the type of job it is taking (Info, Zip etc),
// it just regulates functions and executes them
// in a worker-pool fashion.
jobCh chan func()
}

// WithPool takes a download Protocol and a number of workers
// and creates a N worker pool that share all the download.Protocol
// methods.
func WithPool(workers int) download.Wrapper {
return func(dp download.Protocol) download.Protocol {
jobCh := make(chan func())
p := &withpool{dp: dp, jobCh: jobCh}

p.start(workers)
return p
}
}

func (p *withpool) start(numWorkers int) {
for i := 0; i < numWorkers; i++ {
go p.listen()
}
}

func (p *withpool) listen() {
for f := range p.jobCh {
f()
}
}

func (p *withpool) List(ctx context.Context, mod string) ([]string, error) {
const op errors.Op = "pool.List"
var vers []string
var err error
done := make(chan struct{}, 1)
p.jobCh <- func() {
vers, err = p.dp.List(ctx, mod)
close(done)
}
<-done
if err != nil {
return nil, errors.E(op, err)
}

return vers, nil
}

func (p *withpool) Info(ctx context.Context, mod, ver string) ([]byte, error) {
const op errors.Op = "pool.Info"
var info []byte
var err error
done := make(chan struct{}, 1)
p.jobCh <- func() {
info, err = p.dp.Info(ctx, mod, ver)
close(done)
}
<-done
if err != nil {
return nil, errors.E(op, err)
}
return info, nil
}

func (p *withpool) Latest(ctx context.Context, mod string) (*storage.RevInfo, error) {
const op errors.Op = "pool.Latest"
var info *storage.RevInfo
var err error
done := make(chan struct{}, 1)
p.jobCh <- func() {
info, err = p.dp.Latest(ctx, mod)
close(done)
}
<-done
if err != nil {
return nil, errors.E(op, err)
}
return info, nil
}

func (p *withpool) GoMod(ctx context.Context, mod, ver string) ([]byte, error) {
const op errors.Op = "pool.GoMod"
var goMod []byte
var err error
done := make(chan struct{}, 1)
p.jobCh <- func() {
goMod, err = p.dp.GoMod(ctx, mod, ver)
close(done)
}
<-done
if err != nil {
return nil, errors.E(op, err)
}
return goMod, nil
}

func (p *withpool) Zip(ctx context.Context, mod, ver string) (io.ReadCloser, error) {
const op errors.Op = "pool.Zip"
var zip io.ReadCloser
var err error
done := make(chan struct{}, 1)
p.jobCh <- func() {
zip, err = p.dp.Zip(ctx, mod, ver)
close(done)
}
<-done
if err != nil {
return nil, errors.E(op, err)
}
return zip, nil
}
Loading