Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/stash: allow nonsemver requests to be resolved properly #1015

Merged
merged 7 commits into from
Jan 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions pkg/download/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,13 @@ func (p *protocol) Info(ctx context.Context, mod, ver string) ([]byte, error) {
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
info, err := p.storage.Info(ctx, mod, ver)
var newVer string
if errors.IsNotFoundErr(err) {
err = p.stasher.Stash(ctx, mod, ver)
newVer, err = p.stasher.Stash(ctx, mod, ver)
if err != nil {
return nil, errors.E(op, err)
}
info, err = p.storage.Info(ctx, mod, ver)
info, err = p.storage.Info(ctx, mod, newVer)
}
if err != nil {
return nil, errors.E(op, err)
Expand All @@ -140,12 +141,13 @@ func (p *protocol) GoMod(ctx context.Context, mod, ver string) ([]byte, error) {
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
goMod, err := p.storage.GoMod(ctx, mod, ver)
var newVer string
if errors.IsNotFoundErr(err) {
err = p.stasher.Stash(ctx, mod, ver)
newVer, err = p.stasher.Stash(ctx, mod, ver)
if err != nil {
return nil, errors.E(op, err)
}
goMod, err = p.storage.GoMod(ctx, mod, ver)
goMod, err = p.storage.GoMod(ctx, mod, newVer)
}
if err != nil {
return nil, errors.E(op, err)
Expand All @@ -159,12 +161,13 @@ func (p *protocol) Zip(ctx context.Context, mod, ver string) (io.ReadCloser, err
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
zip, err := p.storage.Zip(ctx, mod, ver)
var newVer string
if errors.IsNotFoundErr(err) {
err = p.stasher.Stash(ctx, mod, ver)
newVer, err = p.stasher.Stash(ctx, mod, ver)
if err != nil {
return nil, errors.E(op, err)
}
zip, err = p.storage.Zip(ctx, mod, ver)
zip, err = p.storage.Zip(ctx, mod, newVer)
}
if err != nil {
return nil, errors.E(op, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/module/go_get_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (g *goGetFetcher) Fetch(ctx context.Context, mod, ver string) (*storage.Ver
}

var storageVer storage.Version
storageVer.Semver = m.Version
info, err := afero.ReadFile(g.fs, m.Info)
if err != nil {
return nil, errors.E(op, err)
Expand Down
24 changes: 18 additions & 6 deletions pkg/stash/stasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (

// Stasher has the job of taking a module
// from an upstream entity and stashing it to a Storage Backend.
// It also returns a string that represents a semver version of
// what was requested, this is helpful if what was requested
// was a descriptive version such as a branch name or a full commit sha.
type Stasher interface {
Stash(ctx context.Context, mod string, ver string) error
Stash(ctx context.Context, mod string, ver string) (string, error)
}

// Wrapper helps extend the main stasher's functionality with addons.
Expand All @@ -37,7 +40,7 @@ type stasher struct {
storage storage.Backend
}

func (s *stasher) Stash(ctx context.Context, mod, ver string) error {
func (s *stasher) Stash(ctx context.Context, mod, ver string) (string, error) {
const op errors.Op = "stasher.Stash"
_, span := observ.StartSpan(ctx, op.String())
defer span.End()
Expand All @@ -49,14 +52,23 @@ func (s *stasher) Stash(ctx context.Context, mod, ver string) error {

v, err := s.fetchModule(ctx, mod, ver)
if err != nil {
return errors.E(op, err)
return "", errors.E(op, err)
}
defer v.Zip.Close()
err = s.storage.Save(ctx, mod, ver, v.Mod, v.Zip, v.Info)
if v.Semver != ver {
exists, err := s.storage.Exists(ctx, mod, v.Semver)
if err != nil {
return "", errors.E(op, err)
}
if exists {
return v.Semver, nil
}
}
err = s.storage.Save(ctx, mod, v.Semver, v.Mod, v.Zip, v.Info)
if err != nil {
return errors.E(op, err)
return "", errors.E(op, err)
}
return nil
return v.Semver, nil
}

func (s *stasher) fetchModule(ctx context.Context, mod, ver string) (*storage.Version, error) {
Expand Down
109 changes: 109 additions & 0 deletions pkg/stash/stasher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package stash

import (
"context"
"io"
"io/ioutil"
"strings"
"testing"

"github.com/gomods/athens/pkg/storage"
)

type stashTest struct {
name string
ver string // the given version
modVer string // the version module.Fetcher returns
shouldCallExists bool // whether storage should be checked before saving
existsResponse bool // the response of storage.Exists if it's called
shouldCallSave bool // whether save or not should be called
}

var stashTests = [...]stashTest{
{
name: "non semver",
ver: "master",
modVer: "v1.2.3",
shouldCallExists: true,
existsResponse: false,
shouldCallSave: true,
},
{
name: "no storage override",
ver: "master",
modVer: "v1.2.3",
shouldCallExists: true,
existsResponse: true,
shouldCallSave: false,
},
{
name: "equal semver",
ver: "v2.0.0",
modVer: "v2.0.0",
shouldCallExists: false,
existsResponse: false,
shouldCallSave: true,
},
}

func TestStash(t *testing.T) {
for _, testCase := range stashTests {
t.Run(testCase.name, func(t *testing.T) {
var ms mockStorage
ms.existsResponse = testCase.existsResponse
var mf mockFetcher
mf.ver = testCase.modVer

s := New(&mf, &ms)
newVersion, err := s.Stash(context.Background(), "module", testCase.ver)
if err != nil {
t.Fatal(err)
}
if newVersion != testCase.modVer {
t.Fatalf("expected Stash to return %v from module.Fetcher but got %v", testCase.modVer, newVersion)
}
if testCase.shouldCallExists != ms.existsCalled {
t.Fatalf("expected a call to storage.Exists to be %v but got %v", testCase.shouldCallExists, ms.existsCalled)
}
if testCase.shouldCallSave {
if ms.givenVersion != testCase.modVer {
t.Fatalf("expected storage.Save to be called with version %v but got %v", testCase.modVer, ms.givenVersion)
}
} else if ms.saveCalled {
t.Fatalf("expected save not to be called")
}
})
}
}

type mockStorage struct {
storage.Backend
existsCalled bool
saveCalled bool
givenVersion string
existsResponse bool
}

func (ms *mockStorage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
ms.saveCalled = true
ms.givenVersion = version
return nil
}

func (ms *mockStorage) Exists(ctx context.Context, mod, ver string) (bool, error) {
ms.existsCalled = true
return ms.existsResponse, nil
}

type mockFetcher struct {
ver string
}

func (mf *mockFetcher) Fetch(ctx context.Context, mod, ver string) (*storage.Version, error) {
return &storage.Version{
Info: []byte("info"),
Mod: []byte("gomod"),
Zip: ioutil.NopCloser(strings.NewReader("zipfile")),
Semver: mf.ver,
}, nil
}
9 changes: 5 additions & 4 deletions pkg/stash/with_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,21 @@ func (s *withpool) listen() {
}
}

func (s *withpool) Stash(ctx context.Context, mod, ver string) error {
func (s *withpool) Stash(ctx context.Context, mod, ver string) (string, error) {
const op errors.Op = "stash.Pool"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
var err error
var newVer string
done := make(chan struct{}, 1)
s.jobCh <- func() {
err = s.stasher.Stash(ctx, mod, ver)
newVer, err = s.stasher.Stash(ctx, mod, ver)
close(done)
}
<-done
if err != nil {
return errors.E(op, err)
return "", errors.E(op, err)
}

return nil
return newVer, nil
}
14 changes: 7 additions & 7 deletions pkg/stash/with_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ import (
)

func TestPoolWrapper(t *testing.T) {
m := &mockStasher{inputMod: "mod", inputVer: "ver", err: fmt.Errorf("wrapped err")}
m := &mockPoolStasher{inputMod: "mod", inputVer: "ver", err: fmt.Errorf("wrapped err")}
s := WithPool(2)(m)
err := s.Stash(context.Background(), m.inputMod, m.inputVer)
_, err := s.Stash(context.Background(), m.inputMod, m.inputVer)
if err.Error() != m.err.Error() {
t.Fatalf("expected err to be `%v` but got `%v`", m.err, err)
}
}

type mockStasher struct {
type mockPoolStasher struct {
inputMod string
inputVer string
err error
}

func (m *mockStasher) Stash(ctx context.Context, mod, ver string) error {
func (m *mockPoolStasher) Stash(ctx context.Context, mod, ver string) (string, error) {
if m.inputMod != mod {
return fmt.Errorf("expected input mod %v but got %v", m.inputMod, mod)
return "", fmt.Errorf("expected input mod %v but got %v", m.inputMod, mod)
}
if m.inputVer != ver {
return fmt.Errorf("expected input ver %v but got %v", m.inputVer, ver)
return "", fmt.Errorf("expected input ver %v but got %v", m.inputVer, ver)
}
return m.err
return "", m.err
}
22 changes: 14 additions & 8 deletions pkg/stash/with_singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,51 @@ import (
func WithSingleflight(s Stasher) Stasher {
sf := &withsf{}
sf.stasher = s
sf.subs = map[string][]chan error{}
sf.subs = map[string][]chan *sfResp{}

return sf
}

type sfResp struct {
newVer string
err error
}

type withsf struct {
stasher Stasher

mu sync.Mutex
subs map[string][]chan error
subs map[string][]chan *sfResp
}

func (s *withsf) process(ctx context.Context, mod, ver string) {
mv := config.FmtModVer(mod, ver)
err := s.stasher.Stash(ctx, mod, ver)
newVer, err := s.stasher.Stash(ctx, mod, ver)
s.mu.Lock()
defer s.mu.Unlock()
for _, ch := range s.subs[mv] {
ch <- err
ch <- &sfResp{newVer, err}
}
delete(s.subs, mv)
}

func (s *withsf) Stash(ctx context.Context, mod, ver string) error {
func (s *withsf) Stash(ctx context.Context, mod, ver string) (string, error) {
const op errors.Op = "singleflight.Stash"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()

mv := config.FmtModVer(mod, ver)
s.mu.Lock()
subCh := make(chan error, 1)
subCh := make(chan *sfResp, 1)
_, inFlight := s.subs[mv]
if !inFlight {
s.subs[mv] = []chan error{subCh}
s.subs[mv] = []chan *sfResp{subCh}
go s.process(ctx, mod, ver)
} else {
s.subs[mv] = append(s.subs[mv], subCh)
}
s.mu.Unlock()

return <-subCh
resp := <-subCh
return resp.newVer, resp.err
}
12 changes: 7 additions & 5 deletions pkg/stash/with_singleflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ func TestSingleFlight(t *testing.T) {
var eg errgroup.Group
for i := 0; i < 5; i++ {
eg.Go(func() error {
return s.Stash(context.Background(), "mod", "ver")
_, err := s.Stash(context.Background(), "mod", "ver")
return err
})
}

Expand All @@ -31,7 +32,8 @@ func TestSingleFlight(t *testing.T) {

for i := 0; i < 5; i++ {
eg.Go(func() error {
return s.Stash(context.Background(), "mod", "ver")
_, err := s.Stash(context.Background(), "mod", "ver")
return err
})
}
err = eg.Wait()
Expand All @@ -50,13 +52,13 @@ type mockSFStasher struct {
num int
}

func (ms *mockSFStasher) Stash(ctx context.Context, mod, ver string) error {
func (ms *mockSFStasher) Stash(ctx context.Context, mod, ver string) (string, error) {
time.Sleep(time.Millisecond * 100) // allow for second requests to come in.
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.num == 0 {
ms.num++
return nil
return "", nil
}
return fmt.Errorf("second time error")
return "", fmt.Errorf("second time error")
}
7 changes: 4 additions & 3 deletions pkg/storage/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import "io"

// Version represents a version of a module and contains .mod file, a .info file and zip file of a specific version
type Version struct {
Mod []byte
Zip io.ReadCloser
Info []byte
Mod []byte
Zip io.ReadCloser
Info []byte
Semver string
}