Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

fix: use one less go-routine per session #377

Merged
merged 2 commits into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,19 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pqm := bspqm.New(ctx, network)

sessionFactory := func(sessctx context.Context, id uint64, spm bssession.SessionPeerManager,
sessionFactory := func(
sessctx context.Context,
sessmgr bssession.SessionManager,
id uint64,
spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
return bssession.New(ctx, sessctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
Expand Down Expand Up @@ -193,6 +197,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
// do it over here to avoid closing before all setup is done.
go func() {
<-px.Closing() // process closes first
sm.Shutdown()
cancelFunc()
notif.Shutdown()
}()
Expand Down
10 changes: 10 additions & 0 deletions internal/blockpresencemanager/blockpresencemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,13 @@ func (bpm *BlockPresenceManager) RemoveKeys(ks []cid.Cid) {
delete(bpm.presence, c)
}
}

// HasKey indicates whether the BlockPresenceManager is tracking the given key
// (used by the tests)
func (bpm *BlockPresenceManager) HasKey(c cid.Cid) bool {
bpm.Lock()
defer bpm.Unlock()

_, ok := bpm.presence[c]
return ok
}
47 changes: 26 additions & 21 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ type PeerManager interface {
SendCancels(context.Context, []cid.Cid)
}

// SessionManager manages all the sessions
type SessionManager interface {
// Remove a session (called when the session shuts down)
RemoveSession(sesid uint64)
// Cancel wants (called when a call to GetBlocks() is cancelled)
CancelSessionWants(sid uint64, wants []cid.Cid)
}

// SessionPeerManager keeps track of peers in the session
type SessionPeerManager interface {
// PeersDiscovered indicates if any peers have been discovered yet
Expand Down Expand Up @@ -91,10 +99,10 @@ type op struct {
// info to, and who to request blocks from.
type Session struct {
// dependencies
bsctx context.Context // context for bitswap
ctx context.Context // context for session
ctx context.Context
shutdown func()
sm SessionManager
pm PeerManager
bpm *bsbpm.BlockPresenceManager
sprm SessionPeerManager
providerFinder ProviderFinder
sim *bssim.SessionInterestManager
Expand Down Expand Up @@ -126,8 +134,8 @@ type Session struct {
// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(
bsctx context.Context, // context for bitswap
ctx context.Context, // context for this session
ctx context.Context,
sm SessionManager,
id uint64,
sprm SessionPeerManager,
providerFinder ProviderFinder,
Expand All @@ -138,13 +146,15 @@ func New(
initialSearchDelay time.Duration,
periodicSearchDelay delay.D,
self peer.ID) *Session {

ctx, cancel := context.WithCancel(ctx)
s := &Session{
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
bsctx: bsctx,
ctx: ctx,
shutdown: cancel,
sm: sm,
pm: pm,
bpm: bpm,
sprm: sprm,
providerFinder: providerFinder,
sim: sim,
Expand All @@ -158,7 +168,7 @@ func New(
periodicSearchDelay: periodicSearchDelay,
self: self,
}
s.sws = newSessionWantSender(id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)
s.sws = newSessionWantSender(id, pm, sprm, sm, bpm, s.onWantsSent, s.onPeersExhausted)

go s.run(ctx)

Expand All @@ -169,6 +179,10 @@ func (s *Session) ID() uint64 {
return s.id
}

func (s *Session) Shutdown() {
s.shutdown()
}

// ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// The SessionManager tells each Session about all keys that it may be
Expand Down Expand Up @@ -295,6 +309,7 @@ func (s *Session) run(ctx context.Context) {
case opCancel:
// Wants were cancelled
s.sw.CancelPending(oper.keys)
s.sws.Cancel(oper.keys)
case opWantsSent:
// Wants were sent to a peer
s.sw.WantsSent(oper.keys)
Expand Down Expand Up @@ -389,19 +404,9 @@ func (s *Session) handleShutdown() {
// Shut down the sessionWantSender (blocks until sessionWantSender stops
// sending)
s.sws.Shutdown()

// Remove session's interest in the given blocks.
cancelKs := s.sim.RemoveSessionInterest(s.id)

// Free up block presence tracking for keys that no session is interested
// in anymore
s.bpm.RemoveKeys(cancelKs)

// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context has already been
// cancelled.
s.pm.SendCancels(s.bsctx, cancelKs)
// Signal to the SessionManager that the session has been shutdown
// and can be cleaned up
s.sm.RemoveSession(s.id)
}

// handleReceive is called when the session receives blocks from a peer
Expand Down
109 changes: 86 additions & 23 deletions internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,40 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
)

type mockSessionMgr struct {
lk sync.Mutex
removeSession bool
cancels []cid.Cid
}

func newMockSessionMgr() *mockSessionMgr {
return &mockSessionMgr{}
}

func (msm *mockSessionMgr) removeSessionCalled() bool {
msm.lk.Lock()
defer msm.lk.Unlock()
return msm.removeSession
}

func (msm *mockSessionMgr) cancelled() []cid.Cid {
msm.lk.Lock()
defer msm.lk.Unlock()
return msm.cancels
}

func (msm *mockSessionMgr) RemoveSession(sesid uint64) {
msm.lk.Lock()
defer msm.lk.Unlock()
msm.removeSession = true
}

func (msm *mockSessionMgr) CancelSessionWants(sid uint64, wants []cid.Cid) {
msm.lk.Lock()
defer msm.lk.Unlock()
msm.cancels = append(msm.cancels, wants...)
}

func newFakeSessionPeerManager() *bsspm.SessionPeerManager {
return bsspm.New(1, newFakePeerTagger())
}
Expand Down Expand Up @@ -61,8 +95,6 @@ type wantReq struct {

type fakePeerManager struct {
wantReqs chan wantReq
lk sync.Mutex
cancels []cid.Cid
}

func newFakePeerManager() *fakePeerManager {
Expand All @@ -82,16 +114,7 @@ func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Ci
case <-ctx.Done():
}
}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
pm.lk.Lock()
defer pm.lk.Unlock()
pm.cancels = append(pm.cancels, cancels...)
}
func (pm *fakePeerManager) allCancels() []cid.Cid {
pm.lk.Lock()
defer pm.lk.Unlock()
return append([]cid.Cid{}, pm.cancels...)
}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {}

func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand All @@ -103,7 +126,8 @@ func TestSessionGetBlocks(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
Expand Down Expand Up @@ -181,9 +205,9 @@ func TestSessionGetBlocks(t *testing.T) {

time.Sleep(10 * time.Millisecond)

// Verify wants were cancelled
if len(fpm.allCancels()) != len(blks) {
t.Fatal("expected cancels to be sent for all wants")
// Verify session was removed
if !sm.removeSessionCalled() {
t.Fatal("expected session to be removed")
}
}

Expand All @@ -198,7 +222,8 @@ func TestSessionFindMorePeers(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
Expand Down Expand Up @@ -272,7 +297,8 @@ func TestSessionOnPeersExhausted(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5)
var cids []cid.Cid
Expand Down Expand Up @@ -316,7 +342,8 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(4)
var cids []cid.Cid
Expand Down Expand Up @@ -428,10 +455,11 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
sm := newMockSessionMgr()

// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
session := New(context.Background(), sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")

timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer timerCancel()
Expand Down Expand Up @@ -459,10 +487,44 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
case <-timerCtx.Done():
t.Fatal("expected channel to be closed before timeout")
}

time.Sleep(10 * time.Millisecond)

// Expect RemoveSession to be called
if !sm.removeSessionCalled() {
t.Fatal("expected onShutdown to be called")
}
}

func TestSessionOnShutdownCalled(t *testing.T) {
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
sim := bssim.New()
bpm := bsbpm.New()
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
sm := newMockSessionMgr()

// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer sesscancel()
session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")

// Shutdown the session
session.Shutdown()

time.Sleep(10 * time.Millisecond)

// Expect RemoveSession to be called
if !sm.removeSessionCalled() {
t.Fatal("expected onShutdown to be called")
}
}

func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
ctx, cancelCtx := context.WithTimeout(context.Background(), 10*time.Millisecond)
func TestSessionReceiveMessageAfterCtxCancel(t *testing.T) {
ctx, cancelCtx := context.WithTimeout(context.Background(), 20*time.Millisecond)
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
Expand All @@ -472,7 +534,8 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(2)
cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}
Expand Down
Loading