Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

fix: use one less go-routine per session #377

Merged
merged 2 commits into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pqm := bspqm.New(ctx, network)

sessionFactory := func(sessctx context.Context, id uint64, spm bssession.SessionPeerManager,
sessionFactory := func(sessctx context.Context, onShutdown bssession.OnShutdown, id uint64, spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
return bssession.New(ctx, sessctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
return bssession.New(ctx, sessctx, onShutdown, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
Expand Down Expand Up @@ -193,6 +193,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
// do it over here to avoid closing before all setup is done.
go func() {
<-px.Closing() // process closes first
sm.Shutdown()
cancelFunc()
notif.Shutdown()
}()
Expand Down
17 changes: 17 additions & 0 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@ type op struct {
keys []cid.Cid
}

type OnShutdown func(uint64)

// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
// info to, and who to request blocks from.
type Session struct {
// dependencies
bsctx context.Context // context for bitswap
ctx context.Context // context for session
shutdown func()
onShutdown OnShutdown
pm PeerManager
bpm *bsbpm.BlockPresenceManager
sprm SessionPeerManager
Expand Down Expand Up @@ -128,6 +132,7 @@ type Session struct {
func New(
bsctx context.Context, // context for bitswap
ctx context.Context, // context for this session
onShutdown OnShutdown,
id uint64,
sprm SessionPeerManager,
providerFinder ProviderFinder,
Expand All @@ -138,11 +143,15 @@ func New(
initialSearchDelay time.Duration,
periodicSearchDelay delay.D,
self peer.ID) *Session {

ctx, cancel := context.WithCancel(ctx)
s := &Session{
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
bsctx: bsctx,
ctx: ctx,
shutdown: cancel,
onShutdown: onShutdown,
pm: pm,
bpm: bpm,
sprm: sprm,
Expand All @@ -169,6 +178,10 @@ func (s *Session) ID() uint64 {
return s.id
}

func (s *Session) Shutdown() {
s.shutdown()
}

// ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// The SessionManager tells each Session about all keys that it may be
Expand Down Expand Up @@ -402,6 +415,10 @@ func (s *Session) handleShutdown() {
// Note: use bitswap context because session context has already been
// cancelled.
s.pm.SendCancels(s.bsctx, cancelKs)

// Signal to the SessionManager that the session has been shutdown
// and can be cleaned up
s.onShutdown(s.id)
}

// handleReceive is called when the session receives blocks from a peer
Expand Down
75 changes: 68 additions & 7 deletions internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ func (pm *fakePeerManager) allCancels() []cid.Cid {
return append([]cid.Cid{}, pm.cancels...)
}

type onShutdownMonitor struct {
lk sync.Mutex
shutdown bool
}

func (sm *onShutdownMonitor) onShutdown(uint64) {
sm.lk.Lock()
defer sm.lk.Unlock()

sm.shutdown = true
}

func (sm *onShutdownMonitor) shutdownCalled() bool {
sm.lk.Lock()
defer sm.lk.Unlock()

return sm.shutdown
}

func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
fpm := newFakePeerManager()
Expand All @@ -103,7 +122,8 @@ func TestSessionGetBlocks(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
onShutdown := func(uint64) {}
session := New(ctx, ctx, onShutdown, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
Expand Down Expand Up @@ -198,7 +218,8 @@ func TestSessionFindMorePeers(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
onShutdown := func(uint64) {}
session := New(ctx, ctx, onShutdown, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
Expand Down Expand Up @@ -272,7 +293,8 @@ func TestSessionOnPeersExhausted(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
onShutdown := func(uint64) {}
session := New(ctx, ctx, onShutdown, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5)
var cids []cid.Cid
Expand Down Expand Up @@ -316,7 +338,8 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
onShutdown := func(uint64) {}
session := New(ctx, ctx, onShutdown, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(4)
var cids []cid.Cid
Expand Down Expand Up @@ -429,9 +452,11 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
defer notif.Shutdown()
id := testutil.GenerateSessionID()

osm := &onShutdownMonitor{}

// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
session := New(context.Background(), sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(context.Background(), sessctx, osm.onShutdown, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")

timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer timerCancel()
Expand Down Expand Up @@ -459,9 +484,44 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
case <-timerCtx.Done():
t.Fatal("expected channel to be closed before timeout")
}

time.Sleep(10 * time.Millisecond)

// Expect onShutdown to be called
if !osm.shutdownCalled() {
t.Fatal("expected onShutdown to be called")
}
}

func TestSessionOnShutdownCalled(t *testing.T) {
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
sim := bssim.New()
bpm := bsbpm.New()
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()

osm := &onShutdownMonitor{}

// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer sesscancel()
session := New(context.Background(), sessctx, osm.onShutdown, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")

// Shutdown the session
session.Shutdown()

time.Sleep(10 * time.Millisecond)

// Expect onShutdown to be called
if !osm.shutdownCalled() {
t.Fatal("expected onShutdown to be called")
}
}

func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
func TestSessionReceiveMessageAfterCtxCancel(t *testing.T) {
ctx, cancelCtx := context.WithTimeout(context.Background(), 10*time.Millisecond)
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
Expand All @@ -472,7 +532,8 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
onShutdown := func(uint64) {}
session := New(ctx, ctx, onShutdown, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(2)
cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}
Expand Down
28 changes: 15 additions & 13 deletions internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ type Session interface {
exchange.Fetcher
ID() uint64
ReceiveFrom(peer.ID, []cid.Cid, []cid.Cid, []cid.Cid)
Shutdown()
}

// SessionFactory generates a new session for the SessionManager to track.
type SessionFactory func(ctx context.Context, id uint64, sprm bssession.SessionPeerManager, sim *bssim.SessionInterestManager, pm bssession.PeerManager, bpm *bsbpm.BlockPresenceManager, notif notifications.PubSub, provSearchDelay time.Duration, rebroadcastDelay delay.D, self peer.ID) Session
type SessionFactory func(ctx context.Context, onShutdown bssession.OnShutdown, id uint64, sprm bssession.SessionPeerManager, sim *bssim.SessionInterestManager, pm bssession.PeerManager, bpm *bsbpm.BlockPresenceManager, notif notifications.PubSub, provSearchDelay time.Duration, rebroadcastDelay delay.D, self peer.ID) Session

// PeerManagerFactory generates a new peer manager for a session.
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.SessionPeerManager
Expand Down Expand Up @@ -54,6 +55,7 @@ type SessionManager struct {
// New creates a new SessionManager.
func New(ctx context.Context, sessionFactory SessionFactory, sessionInterestManager *bssim.SessionInterestManager, peerManagerFactory PeerManagerFactory,
blockPresenceManager *bsbpm.BlockPresenceManager, peerManager bssession.PeerManager, notif notifications.PubSub, self peer.ID) *SessionManager {

return &SessionManager{
ctx: ctx,
sessionFactory: sessionFactory,
Expand All @@ -73,26 +75,26 @@ func (sm *SessionManager) NewSession(ctx context.Context,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) exchange.Fetcher {
id := sm.GetNextSessionID()
sessionctx, cancel := context.WithCancel(ctx)

pm := sm.peerManagerFactory(sessionctx, id)
session := sm.sessionFactory(sessionctx, id, pm, sm.sessionInterestManager, sm.peerManager, sm.blockPresenceManager, sm.notif, provSearchDelay, rebroadcastDelay, sm.self)
pm := sm.peerManagerFactory(ctx, id)
session := sm.sessionFactory(ctx, sm.removeSession, id, pm, sm.sessionInterestManager, sm.peerManager, sm.blockPresenceManager, sm.notif, provSearchDelay, rebroadcastDelay, sm.self)

sm.sessLk.Lock()
sm.sessions[id] = session
sm.sessLk.Unlock()
go func() {
defer cancel()
select {
case <-sm.ctx.Done():
sm.removeSession(id)
case <-ctx.Done():
sm.removeSession(id)
}
}()

return session
}

func (sm *SessionManager) Shutdown() {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()

for _, ses := range sm.sessions {
ses.Shutdown()
}
}

func (sm *SessionManager) removeSession(sesid uint64) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
Expand Down
23 changes: 17 additions & 6 deletions internal/sessionmanager/sessionmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type fakeSession struct {
wantBlocks []cid.Cid
wantHaves []cid.Cid
id uint64
onShutdown bssession.OnShutdown
pm *fakeSesPeerManager
notif notifications.PubSub
}
Expand All @@ -41,6 +42,9 @@ func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid, wantBlocks []cid.Cid
fs.wantBlocks = append(fs.wantBlocks, wantBlocks...)
fs.wantHaves = append(fs.wantHaves, wantHaves...)
}
func (fs *fakeSession) Shutdown() {
go fs.onShutdown(fs.id)
}

type fakeSesPeerManager struct {
}
Expand All @@ -65,6 +69,7 @@ func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid)
}

func sessionFactory(ctx context.Context,
onShutdown bssession.OnShutdown,
id uint64,
sprm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
Expand All @@ -74,11 +79,17 @@ func sessionFactory(ctx context.Context,
provSearchDelay time.Duration,
rebroadcastDelay delay.D,
self peer.ID) Session {
return &fakeSession{
id: id,
pm: sprm.(*fakeSesPeerManager),
notif: notif,
fs := &fakeSession{
id: id,
onShutdown: onShutdown,
pm: sprm.(*fakeSesPeerManager),
notif: notif,
}
go func() {
<-ctx.Done()
fs.onShutdown(fs.id)
}()
return fs
}

func peerManagerFactory(ctx context.Context, id uint64) bssession.SessionPeerManager {
Expand Down Expand Up @@ -132,7 +143,7 @@ func TestReceiveFrom(t *testing.T) {
}
}

func TestReceiveBlocksWhenManagerContextCancelled(t *testing.T) {
func TestReceiveBlocksWhenManagerShutdown(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -154,7 +165,7 @@ func TestReceiveBlocksWhenManagerContextCancelled(t *testing.T) {
sim.RecordSessionInterest(secondSession.ID(), []cid.Cid{block.Cid()})
sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})

cancel()
sm.Shutdown()

// wait for sessions to get removed
time.Sleep(10 * time.Millisecond)
Expand Down