Skip to content

Commit

Permalink
libindex: fix file access race
Browse files Browse the repository at this point in the history
This moves *all* refcounting operations and file creation/removal under
a single lock.

Signed-off-by: Hank Donnay <hdonnay@redhat.com>
  • Loading branch information
hdonnay committed Feb 4, 2022
1 parent 8f79866 commit b2eca12
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 67 deletions.
123 changes: 58 additions & 65 deletions libindex/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -51,33 +52,61 @@ func (a *FetchArena) Init(wc *http.Client, root string) {
a.rc = make(map[string]int)
}

func (a *FetchArena) incRef(digest string) error {
a.mu.Lock()
a.rc[digest]++
a.mu.Unlock()
return nil
}

func (a *FetchArena) decRef(digest string) (int, error) {
func (a *FetchArena) forget(digest string) error {
a.mu.Lock()
defer a.mu.Unlock()
a.rc[digest]--
ct := a.rc[digest]
ct, ok := a.rc[digest]
if !ok {
return nil
}
ct--
if ct == 0 {
delete(a.rc, digest)
a.sf.Forget(digest)
return 0, os.Remove(filepath.Join(a.root, digest))
defer a.sf.Forget(digest)
return os.Remove(filepath.Join(a.root, digest))
}
return ct, nil
a.rc[digest] = ct
return nil
}

func (a *FetchArena) filename(l *claircore.Layer) string {
digest := l.Hash.String()
n := filepath.Join(a.root, digest)
a.mu.Lock()
a.rc[digest] = 0
a.mu.Unlock()
return n
// FetchOne does a deduplicated fetch, then increments the refcount and renames
// the file to the permanent place if applicable.
func (a *FetchArena) fetchOne(ctx context.Context, l *claircore.Layer) (do func() error) {
do = func() error {
h := l.Hash.String()
tgt := filepath.Join(a.root, h)
var ff string
select {
case res := <-a.sf.DoChan(h, func() (interface{}, error) {
return a.realizeLayer(ctx, l)
}):
if err := res.Err; err != nil {
return err
}
ff = res.Val.(string)
case <-ctx.Done():
return ctx.Err()
}
a.mu.Lock()
ct, ok := a.rc[h]
if !ok {
// Did the file get removed while we were waiting on the lock?
if _, err := os.Stat(ff); errors.Is(err, os.ErrNotExist) {
a.mu.Unlock()
return do()
}
if err := os.Rename(ff, tgt); err != nil {
a.mu.Unlock()
return err
}
}
defer a.mu.Unlock()
ct++
a.rc[h] = ct
l.SetLocal(tgt)
return nil
}
return do
}

// Close removes all files left in the arena.
Expand Down Expand Up @@ -116,6 +145,8 @@ func (a *FetchArena) Close(ctx context.Context) error {
}

// RealizeLayer is the inner function used inside the singleflight.
//
// The returned value is a temporary filename in the arena.
func (a *FetchArena) realizeLayer(ctx context.Context, l *claircore.Layer) (string, error) {
ctx = baggage.ContextWithValues(ctx,
label.String("component", "libindex/fetchArena.realizeLayer"),
Expand All @@ -139,12 +170,12 @@ func (a *FetchArena) realizeLayer(ctx context.Context, l *claircore.Layer) (stri
want := l.Hash.Checksum()

// Open our target file before hitting the network.
name := a.filename(l)
rm := true
fd, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600)
fd, err := os.CreateTemp(a.root, "fetch.*")
if err != nil {
return "", fmt.Errorf("fetcher: unable to create file: %w", err)
}
name := fd.Name()
defer func() {
if err := fd.Close(); err != nil {
zlog.Warn(ctx).Err(err).Msg("unable to close layer file")
Expand Down Expand Up @@ -276,68 +307,30 @@ func (a *FetchArena) Fetcher() *FetchProxy {
// This can be unexported if FetchArena gets unexported.
type FetchProxy struct {
a *FetchArena
mu sync.Mutex
clean []string
}

// Fetch populates all the layers locally.
func (p *FetchProxy) Fetch(ctx context.Context, ls []*claircore.Layer) error {
g, ctx := errgroup.WithContext(ctx)
for _, l := range ls {
g.Go(p.fetchOne(ctx, l))
p.clean = make([]string, len(ls))
for i, l := range ls {
p.clean[i] = l.Hash.String()
g.Go(p.a.fetchOne(ctx, l))
}
if err := g.Wait(); err != nil {
return fmt.Errorf("encountered error while fetching a layer: %v", err)
}
return nil
}

// FetchOne runs a fetch though the singleflight while waiting on the passed-in
// context.
func (p *FetchProxy) fetchOne(ctx context.Context, l *claircore.Layer) func() error {
fn := func() (interface{}, error) {
return p.a.realizeLayer(ctx, l)
}
return func() error {
h := l.Hash.String()
select {
case res := <-p.a.sf.DoChan(h, fn):
if err := res.Err; err != nil {
return err
}
fn := res.Val.(string)
if err := l.SetLocal(fn); err != nil {
return err
}
if err := p.addRef(h); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
return nil
}
}

func (p *FetchProxy) addRef(digest string) error {
if err := p.a.incRef(digest); err != nil {
return err
}
p.mu.Lock()
p.clean = append(p.clean, digest)
p.mu.Unlock()
return nil
}

// Close marks all the layers' backing files as unused.
//
// This method may actually delete the backing files.
func (p *FetchProxy) Close() error {
var err error
p.mu.Lock()
defer p.mu.Unlock()
for _, digest := range p.clean {
_, e := p.a.decRef(digest)
e := p.a.forget(digest)
if e != nil {
if err == nil {
err = e
Expand Down
100 changes: 98 additions & 2 deletions libindex/fetcher_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package libindex

import (
"archive/tar"
"context"
"crypto/sha256"
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strconv"
"testing"
Expand Down Expand Up @@ -70,15 +77,15 @@ func TestFetchInvalid(t *testing.T) {
{
name: "no remote path or local path provided",
layer: []*claircore.Layer{
&claircore.Layer{
{
URI: "",
},
},
},
{
name: "path with no scheme",
layer: []*claircore.Layer{
&claircore.Layer{
{
URI: "www.example.com/path/to/tar?query=one",
},
},
Expand All @@ -102,3 +109,92 @@ func TestFetchInvalid(t *testing.T) {
})
}
}

func TestFetchConcurrent(t *testing.T) {
ctx, done := context.WithCancel(context.Background())
defer done()
ctx = zlog.Test(ctx, t)
ls, h := commonLayerServer(t, 100)
srv := httptest.NewUnstartedServer(h)
srv.Start()
for i := range ls {
ls[i].URI = srv.URL + ls[i].URI
}
defer srv.Close()
a := &FetchArena{}
a.Init(srv.Client(), t.TempDir())

subtest := func(a *FetchArena, ls []claircore.Layer) func(*testing.T) {
// Need to make a copy of all our layers.
l := make([]claircore.Layer, len(ls))
copy(l, ls)
// And then turn into pointers for reasons.
ps := make([]*claircore.Layer, len(l))
// Leave the bottom half the same, shuffle the top half.
rand.Shuffle(len(ps), func(i, j int) {
ps[i], ps[j] = &l[j], &l[i]
})
for i := range ps[:len(ps)/2] {
ps[i] = &l[i]
}
f := a.Fetcher()
return func(t *testing.T) {
t.Parallel()
t.Cleanup(func() {
if err := f.Close(); err != nil {
t.Error(err)
}
})
ctx := zlog.Test(ctx, t)
if err := f.Fetch(ctx, ps); err != nil {
t.Error(err)
}
}
}
t.Run("group", func(t *testing.T) {
for i := 0; i < 2; i++ {
t.Run(strconv.Itoa(i), subtest(a, ls))
}
})

if err := a.Close(ctx); err != nil {
t.Error(err)
}
}

func commonLayerServer(t testing.TB, ct int) ([]claircore.Layer, http.Handler) {
t.Helper()
dir := t.TempDir()
ls := make([]claircore.Layer, ct)
for i := 0; i < ct; i++ {
n := strconv.Itoa(i)
f, err := os.Create(filepath.Join(dir, strconv.Itoa(i)))
if err != nil {
t.Fatal(err)
}
h := sha256.New()
w := tar.NewWriter(io.MultiWriter(f, h))
if err := w.WriteHeader(&tar.Header{
Name: n,
Size: 33,
}); err != nil {
t.Fatal(err)
}
fmt.Fprintf(w, "%032d\n", i)

if err := w.Close(); err != nil {
t.Fatal(err)
}
if err := f.Close(); err != nil {
t.Fatal(err)
}
l := &ls[i]
l.URI = "/" + strconv.Itoa(i)
l.Hash, err = claircore.NewDigest("sha256", h.Sum(nil))
l.Headers = make(http.Header)
if err != nil {
t.Fatal(err)
}
}
return ls, http.FileServer(http.Dir(dir))
}

0 comments on commit b2eca12

Please sign in to comment.