Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(pullstorage): share common iterators in pullstorage #1683

Merged
merged 1 commit into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion pkg/pullsync/pullstorage/pullstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package pullstorage
import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/ethersphere/bee/pkg/storage"
Expand Down Expand Up @@ -40,32 +42,88 @@ type Storer interface {
Has(ctx context.Context, addr swarm.Address) (bool, error)
}

type intervalChunks struct {
chs []swarm.Address
topmost uint64
err error
}

// ps wraps storage.Storer.
type ps struct {
storage.Storer

openSubs map[string][]chan intervalChunks
openSubsMu sync.Mutex
}

// New returns a new pullstorage Storer instance.
func New(storer storage.Storer) Storer {
return &ps{
Storer: storer,
Storer: storer,
openSubs: make(map[string][]chan intervalChunks),
}
}

// IntervalChunks collects chunk for a requested interval.
func (s *ps) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, limit int) (chs []swarm.Address, topmost uint64, err error) {
var (
k = subKey(bin, from, to, limit)
c = make(chan intervalChunks)
)
s.openSubsMu.Lock()
if subs, ok := s.openSubs[k]; ok {
// some subscription already exists, add ours
// and wait for the result
subs = append(subs, c)
s.openSubs[k] = subs
s.openSubsMu.Unlock()
select {
case res := <-c:
// since this is a simple read from a channel, one slow
// peer requesting chunks will not affect another
return res.chs, res.topmost, res.err
case <-ctx.Done():
// note that there's no cleanup here of the existing channel.
// this is due to a possible deadlock in case notification is
// already ongoing (we cannot acquire the lock and the notifying
// goroutine will still potentially try to write to our channel,
// however since we're not selecting on the channel it will deadlock.
return nil, 0, ctx.Err()
}
}
s.openSubs[k] = make([]chan intervalChunks, 0)
s.openSubsMu.Unlock()

// call iterator, iterate either until upper bound or limit reached
// return addresses, topmost is the topmost bin ID
var (
timer *time.Timer
timerC <-chan time.Time
)

ch, dbClosed, stop := s.SubscribePull(ctx, bin, from, to)
defer func(start time.Time) {
stop()
if timer != nil {
timer.Stop()
}

// tell others about the results
s.openSubsMu.Lock()
for _, c := range s.openSubs[k] {
select {
case c <- intervalChunks{chs: chs, topmost: topmost, err: err}:
default:
// this is needed because the polling goroutine might go away in
// the meanwhile due to context cancellation, and therefore a
// simple write to the channel will necessarily result in a
// deadlock, since there's one reading on the other side, causing
// this goroutine to deadlock.
}
}
delete(s.openSubs, k)
s.openSubsMu.Unlock()

}(time.Now())

var nomore bool
Expand Down Expand Up @@ -141,3 +199,7 @@ func (s *ps) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
_, err := s.Storer.Put(ctx, mode, chs...)
return err
}

func subKey(bin uint8, from, to uint64, limit int) string {
return fmt.Sprintf("%d_%d_%d_%d", bin, from, to, limit)
}
64 changes: 64 additions & 0 deletions pkg/pullsync/pullstorage/pullstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"crypto/rand"
"errors"
"io/ioutil"
"reflect"
"testing"
"time"
"unsafe"

"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
Expand Down Expand Up @@ -298,6 +300,68 @@ func TestIntervalChunks_Localstore(t *testing.T) {
}
}

// TestIntervalChunks_IteratorShare tests that two goroutines
// with the same subscription call the SubscribePull only once
// and that results are shared between both of them.
func TestIntervalChunks_IteratorShare(t *testing.T) {
desc := someDescriptors(0, 2)
ps, db := newPullStorage(t, mock.WithSubscribePullChunks(desc...), mock.WithPartialInterval(true))

go func() {
// delay is needed in order to have the iterator
// linger for a bit longer for more chunks.
<-time.After(200 * time.Millisecond)
// add chunks to subscribe pull on the storage mock
db.MorePull(someDescriptors(1, 3, 4)...)
}()

type result struct {
addrs []swarm.Address
top uint64
}
sched := make(chan struct{})
c := make(chan result)

go func() {
close(sched)
addrs, topmost, err := ps.IntervalChunks(context.Background(), 0, 0, 5, limit)
if err != nil {
t.Errorf("internal goroutine: %v", err)
}
c <- result{addrs, topmost}

}()
<-sched // wait for goroutine to get scheduled

addrs, topmost, err := ps.IntervalChunks(context.Background(), 0, 0, 5, limit)
if err != nil {
t.Fatal(err)
}

res := <-c

if l := len(addrs); l != 5 {
t.Fatalf("want %d addrs but got %d", 5, l)
}

// highest chunk we sent had BinID 5
exp := uint64(5)
if topmost != exp {
t.Fatalf("expected topmost %d but got %d", exp, topmost)
}
if c := db.SubscribePullCalls(); c != 1 {
t.Fatalf("wanted 1 subscribe pull calls, got %d", c)
}

// check that results point to same array
sh := (*reflect.SliceHeader)(unsafe.Pointer(&res.addrs))
sh2 := (*reflect.SliceHeader)(unsafe.Pointer(&addrs))

if sh.Data != sh2.Data {
t.Fatalf("results not shared between goroutines. ptr1 %d ptr2 %d", sh.Data, sh2.Data)
}
}

func newPullStorage(t *testing.T, o ...mock.Option) (pullstorage.Storer, *mock.MockStorer) {
db := mock.NewStorer(o...)
ps := pullstorage.New(db)
Expand Down
8 changes: 8 additions & 0 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,14 @@ func (s *Syncer) Close() error {
defer close(cc)
s.wg.Wait()
}()

// cancel all contexts
s.ruidMtx.Lock()
for _, c := range s.ruidCtx {
c()
}
s.ruidMtx.Unlock()

select {
case <-cc:
case <-time.After(10 * time.Second):
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/mock/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type MockStorer struct {
pinnedAddress []swarm.Address // Stores the pinned address
pinnedCounter []uint64 // and its respective counter. These are stored as slices to preserve the order.
subpull []storage.Descriptor
subpullCalls int
partialInterval bool
morePull chan struct{}
mtx sync.Mutex
Expand Down Expand Up @@ -220,6 +221,7 @@ func (m *MockStorer) LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
}

func (m *MockStorer) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (<-chan storage.Descriptor, <-chan struct{}, func()) {
m.subpullCalls++
c := make(chan storage.Descriptor)
done := make(chan struct{})
stop := func() {
Expand Down Expand Up @@ -275,6 +277,10 @@ func (m *MockStorer) SubscribePull(ctx context.Context, bin uint8, since, until
return c, m.quit, stop
}

func (m *MockStorer) SubscribePullCalls() int {
return m.subpullCalls
}

func (m *MockStorer) MorePull(d ...storage.Descriptor) {
// clear out what we already have in subpull
m.mtx.Lock()
Expand Down