Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pinning): check before writing root address #4558

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/storer/internal/pinning/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
ErrInvalidPinCollectionItemSize = errInvalidPinCollectionSize
ErrPutterAlreadyClosed = errPutterAlreadyClosed
ErrCollectionRootAddressIsZero = errCollectionRootAddressIsZero
ErrDuplicatePinCollection = errDuplicatePinCollection
)

var NewUUID = newUUID
Expand Down
16 changes: 15 additions & 1 deletion pkg/storer/internal/pinning/pinning.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
// errCollectionRootAddressIsZero is returned if the putter is closed with a zero
// swarm.Address. Root reference has to be set.
errCollectionRootAddressIsZero = errors.New("pin store: collection root address is zero")
// errDuplicatePinCollection is returned when attempted to pin the same file repeatedly
errDuplicatePinCollection = errors.New("pin store: duplicate pin collection")
)

// creates a new UUID and returns it as a byte slice
Expand Down Expand Up @@ -264,9 +266,21 @@ func (c *collectionPutter) Close(st internal.Storage, writer storage.Writer, roo
c.mtx.Lock()
defer c.mtx.Unlock()

collection := &pinCollectionItem{Addr: root}
has, err := st.IndexStore().Has(collection)

if err != nil {
return fmt.Errorf("pin store: check previous root: %w", err)
}

if has {
// trigger the Cleanup
return errDuplicatePinCollection
}

// Save the root pin reference.
c.collection.Addr = root
err := writer.Put(c.collection)
err = writer.Put(c.collection)
if err != nil {
return fmt.Errorf("pin store: failed updating collection: %w", err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't solve the problem of residual collectionChunks with their corresponding forever-remaining increments of the chunkstore RefCnt. Consider:

  1. Two or more simultaneous operations start pinning the exact same content.
  2. They all do the individual Puts, and the root collection doesn't yet exist, so they all actually store their collectionChunks and invoke chunkstore.Put incrementing RefCnt on those chunks.
  3. One of the pinners actually executes the Close causing the pinCollectionItem to exist. That one "wins" and actually completes the pin.
  4. When the other pinners finally invoke Close, they'll correctly not replace the original pinCollectionItem, but all of those incorrect RefCnt increments are still there causing those chunks to never be able to be removed from the chunkstore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically returning an error on line :276 will make cleanupOnErrWriter wrapper object to perform a cleanup (by calling the Cleanup method) whenever the error is detected

bee/pkg/api/api.go

Lines 870 to 874 in bf56d82

func (r *cleanupOnErrWriter) WriteHeader(statusCode int) {
// if there is an error status returned, cleanup.
if statusCode >= http.StatusBadRequest {
err := r.onErr()
if err != nil {

notice the onErr pointing to the .Cleanup method

bee/pkg/api/bzz.go

Lines 106 to 110 in bf56d82

ow := &cleanupOnErrWriter{
ResponseWriter: w,
onErr: putter.Cleanup,
logger: logger,
}

which in turn will decrement the ref count(s) to the correct value. See the test below.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for that description. I should have followed the error path back out from Close() myself. I'm setting up a testnet environment to verify this on a full OSM tileset load whenever it is merged.

Expand Down
24 changes: 23 additions & 1 deletion pkg/storer/internal/pinning/pinning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,29 @@ func TestPinStore(t *testing.T) {
}
})

t.Run("duplicate collection", func(t *testing.T) {
root := chunktest.GenerateTestRandomChunk()
putter, err := pinstore.NewCollection(st)
if err != nil {
t.Fatal(err)
}

err = putter.Put(context.Background(), st, st.IndexStore(), root)
if err != nil {
t.Fatal(err)
}

err = putter.Close(st, st.IndexStore(), root.Address())
if err != nil {
t.Fatal(err)
}

err = putter.Close(st, st.IndexStore(), root.Address())
if err == nil || !errors.Is(err, pinstore.ErrDuplicatePinCollection) {
t.Fatalf("unexpected error during CLose, want: %v, got: %v", pinstore.ErrDuplicatePinCollection, err)
}
})

t.Run("zero address close", func(t *testing.T) {
root := chunktest.GenerateTestRandomChunk()
putter, err := pinstore.NewCollection(st)
Expand All @@ -292,7 +315,6 @@ func TestPinStore(t *testing.T) {
if !errors.Is(err, pinstore.ErrCollectionRootAddressIsZero) {
t.Fatalf("unexpected error on close, want: %v, got: %v", pinstore.ErrCollectionRootAddressIsZero, err)
}

})
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/storer/pinstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,51 @@ func testPinStore(t *testing.T, newStorer func() (*storer.DB, error)) {
verifyPinCollection(t, lstore.Repo(), testCases[0].chunks[0], testCases[0].chunks, true)
})
})

t.Run("duplicate parallel upload does not leave orphaned chunks", func(t *testing.T) {
chunks := chunktesting.GenerateTestRandomChunks(4)

session1, err := lstore.NewCollection(context.TODO())
if err != nil {
t.Fatalf("NewCollection(...): unexpected error: %v", err)
}

session2, err := lstore.NewCollection(context.TODO())
if err != nil {
t.Fatalf("NewCollection2(...): unexpected error: %v", err)
}

for _, ch := range chunks {
err := session2.Put(context.TODO(), ch)
if err != nil {
t.Fatalf("session2.Put(...): unexpected error: %v", err)
t.Fatal(err)
}

err = session1.Put(context.TODO(), ch)
if err != nil {
t.Fatalf("session1.Put(...): unexpected error: %v", err)
t.Fatal(err)
}
}

err = session1.Done(chunks[0].Address())
if err != nil {
t.Fatalf("session1.Done(...): unexpected error: %v", err)
}

err = session2.Done(chunks[0].Address())
if err == nil {
t.Fatalf("session2.Done(...): expected error, got nil")
}

if err := session2.Cleanup(); err != nil {
t.Fatalf("session2.Done(...): unexpected error: %v", err)
}

verifyPinCollection(t, lstore.Repo(), chunks[0], chunks, true)
verifyChunkRefCount(t, lstore.Repo(), chunks)
})
}

func TestPinStore(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions pkg/storer/storer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
"github.com/ethersphere/bee/pkg/storage/migration"
"github.com/ethersphere/bee/pkg/storer"
cs "github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning"
"github.com/ethersphere/bee/pkg/storer/internal/upload"
localmigration "github.com/ethersphere/bee/pkg/storer/migration"
Expand All @@ -44,7 +45,26 @@ func verifyChunks(
t.Fatalf("unexpected chunk has state: want %t have %t", has, hasFound)
}
}
}

func verifyChunkRefCount(
t *testing.T,
repo storage.Repository,
chunks []swarm.Chunk,
) {
t.Helper()

for _, ch := range chunks {
_ = repo.IndexStore().Iterate(storage.Query{
Factory: func() storage.Item { return new(cs.RetrievalIndexItem) },
}, func(r storage.Result) (bool, error) {
entry := r.Entry.(*cs.RetrievalIndexItem)
if entry.Address.Equal(ch.Address()) && entry.RefCnt != 1 {
t.Errorf("chunk %s has refCnt=%d", ch.Address(), entry.RefCnt)
}
return false, nil
})
}
}

func verifySessionInfo(
Expand Down
Loading