Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Context-based sessions #401

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
wm = bswm.New(ctx, pm, sim, bpm)
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager,
sessionFactory := func(ctx context.Context, id exchange.SessionID, spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
Expand All @@ -151,7 +151,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
self peer.ID) bssm.Session {
return bssession.New(ctx, id, wm, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
sessionPeerManagerFactory := func(ctx context.Context, id exchange.SessionID) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
}
notif := notifications.New()
Expand Down Expand Up @@ -303,7 +303,7 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
session := bs.sm.GetSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
return session.GetBlocks(ctx, keys)
}

Expand Down Expand Up @@ -525,12 +525,18 @@ func (bs *Bitswap) IsOnline() bool {
return true
}

// NewSession generates a new Bitswap session. You should use this, rather
// that calling Bitswap.GetBlocks, any time you intend to do several related
// block requests in a row. The session returned will have it's own GetBlocks
// method, but the session will use the fact that the requests are related to
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
// NewSession generates a new Bitswap session or returns the session associated
// with the passed context (created with exchange.NewSession(ctx)).
//
// You should construct a session either with this function or
// exchange.NewSession instead of repeatedly calling Bitswap.GetBlock(s) any
// time you intend to do several related block requests in a row. The session
// will use the fact that the requests are related to be more efficient in its
// requests to peers.
//
// Note: If you've already wrapped your context with exchange.NewSession, you do
// not need to call this function. When you call the GetBlock(s) functions with
// that context, it will use the associated session.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
return bs.sm.GetSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-delay v0.0.1
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-interface v0.0.2-0.20200423051052-ccb5de3a8346
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v1.0.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ github.com/ipfs/go-ipfs-ds-help v0.0.1 h1:QBg+Ts2zgeemK/dB0saiF/ykzRGgfoFMT90Rzo
github.com/ipfs/go-ipfs-ds-help v0.0.1/go.mod h1:gtP9xRaZXqIQRh1HRpp595KbBEdgqWFxefeVKOV8sxo=
github.com/ipfs/go-ipfs-ds-help v0.1.1 h1:IW/bXGeaAZV2VH0Kuok+Ohva/zHkHmeLFBxC1k7mNPc=
github.com/ipfs/go-ipfs-ds-help v0.1.1/go.mod h1:SbBafGJuGsPI/QL3j9Fc5YPLeAu+SzOkI0gFwAg+mOs=
github.com/ipfs/go-ipfs-exchange-interface v0.0.1 h1:LJXIo9W7CAmugqI+uofioIpRb6rY30GUu7G6LUfpMvM=
github.com/ipfs/go-ipfs-exchange-interface v0.0.1/go.mod h1:c8MwfHjtQjPoDyiy9cFquVtVHkO9b9Ob3FG91qJnWCM=
github.com/ipfs/go-ipfs-exchange-interface v0.0.2-0.20200423051052-ccb5de3a8346 h1:/PkmI6g/CMnuC0dbw0jVh0b1kaU2F07fX5MawGyjaJ8=
github.com/ipfs/go-ipfs-exchange-interface v0.0.2-0.20200423051052-ccb5de3a8346/go.mod h1:uChpHomshgrKU+8Uh6y1bh3SKn0jBRrdz4tHGKdDYao=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-routing v0.1.0 h1:gAJTT1cEeeLj6/DlLX6t+NxD9fQe2ymTO6qWRDI/HQQ=
Expand Down
15 changes: 8 additions & 7 deletions internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-metrics-interface"

Expand All @@ -23,7 +24,7 @@ type PeerQueue interface {
}

type Session interface {
ID() uint64
ID() exchange.SessionID
SignalAvailability(peer.ID, bool)
}

Expand All @@ -42,8 +43,8 @@ type PeerManager struct {
ctx context.Context

psLk sync.RWMutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}
sessions map[exchange.SessionID]Session
peerSessions map[peer.ID]map[exchange.SessionID]struct{}

self peer.ID
}
Expand All @@ -58,8 +59,8 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *P
ctx: ctx,
self: self,

sessions: make(map[uint64]Session),
peerSessions: make(map[peer.ID]map[uint64]struct{}),
sessions: make(map[exchange.SessionID]Session),
peerSessions: make(map[peer.ID]map[exchange.SessionID]struct{}),
}
}

Expand Down Expand Up @@ -202,7 +203,7 @@ func (pm *PeerManager) RegisterSession(p peer.ID, s Session) bool {
}

if _, ok := pm.peerSessions[p]; !ok {
pm.peerSessions[p] = make(map[uint64]struct{})
pm.peerSessions[p] = make(map[exchange.SessionID]struct{})
}
pm.peerSessions[p][s.ID()] = struct{}{}

Expand All @@ -212,7 +213,7 @@ func (pm *PeerManager) RegisterSession(p peer.ID, s Session) bool {

// UnregisterSession tells the PeerManager that the given session is no longer
// interested in PeerManager events.
func (pm *PeerManager) UnregisterSession(ses uint64) {
func (pm *PeerManager) UnregisterSession(ses exchange.SessionID) {
pm.psLk.Lock()
defer pm.psLk.Unlock()

Expand Down
9 changes: 5 additions & 4 deletions internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ipfs/go-bitswap/internal/testutil"
cid "github.com/ipfs/go-cid"
exchange "github.com/ipfs/go-ipfs-exchange-interface"

"github.com/libp2p/go-libp2p-core/peer"
)
Expand Down Expand Up @@ -255,19 +256,19 @@ func TestSendCancels(t *testing.T) {
}
}

func (s *sess) ID() uint64 {
func (s *sess) ID() exchange.SessionID {
return s.id
}
func (s *sess) SignalAvailability(p peer.ID, isAvailable bool) {
s.available[p] = isAvailable
}

type sess struct {
id uint64
id exchange.SessionID
available map[peer.ID]bool
}

func newSess(id uint64) *sess {
func newSess(id exchange.SessionID) *sess {
return &sess{id, make(map[peer.ID]bool)}
}

Expand All @@ -281,7 +282,7 @@ func TestSessionRegistration(t *testing.T) {
self, p1, p2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)

id := uint64(1)
id := testutil.GenerateSessionID()
s := newSess(id)
peerManager.RegisterSession(p1, s)
if s.available[p1] {
Expand Down
13 changes: 7 additions & 6 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
loggables "github.com/libp2p/go-libp2p-loggables"
Expand All @@ -30,10 +31,10 @@ const (
type WantManager interface {
// BroadcastWantHaves sends want-haves to all connected peers (used for
// session discovery)
BroadcastWantHaves(context.Context, uint64, []cid.Cid)
BroadcastWantHaves(context.Context, exchange.SessionID, []cid.Cid)
// RemoveSession removes the session from the WantManager (when the
// session shuts down)
RemoveSession(context.Context, uint64)
RemoveSession(context.Context, exchange.SessionID)
}

// PeerManager keeps track of which sessions are interested in which peers
Expand All @@ -44,7 +45,7 @@ type PeerManager interface {
RegisterSession(peer.ID, bspm.Session) bool
// UnregisterSession tells the PeerManager that the session is no longer
// interested in a peer's connection state
UnregisterSession(uint64)
UnregisterSession(exchange.SessionID)
// SendWants tells the PeerManager to send wants to the given peer
SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
}
Expand Down Expand Up @@ -122,15 +123,15 @@ type Session struct {
// identifiers
notif notifications.PubSub
uuid logging.Loggable
id uint64
id exchange.SessionID

self peer.ID
}

// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context,
id uint64,
id exchange.SessionID,
wm WantManager,
sprm SessionPeerManager,
providerFinder ProviderFinder,
Expand Down Expand Up @@ -166,7 +167,7 @@ func New(ctx context.Context,
return s
}

func (s *Session) ID() uint64 {
func (s *Session) ID() exchange.SessionID {
return s.id
}

Expand Down
10 changes: 6 additions & 4 deletions internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
peer "github.com/libp2p/go-libp2p-core/peer"
)

Expand All @@ -31,16 +32,16 @@ func newFakeWantManager() *fakeWantManager {
}
}

func (fwm *fakeWantManager) BroadcastWantHaves(ctx context.Context, sesid uint64, cids []cid.Cid) {
func (fwm *fakeWantManager) BroadcastWantHaves(ctx context.Context, sesid exchange.SessionID, cids []cid.Cid) {
select {
case fwm.wantReqs <- wantReq{cids}:
case <-ctx.Done():
}
}
func (fwm *fakeWantManager) RemoveSession(context.Context, uint64) {}
func (fwm *fakeWantManager) RemoveSession(context.Context, exchange.SessionID) {}

func newFakeSessionPeerManager() *bsspm.SessionPeerManager {
return bsspm.New(1, newFakePeerTagger())
return bsspm.New(testutil.GenerateSessionID(), newFakePeerTagger())
}

type fakePeerTagger struct {
Expand Down Expand Up @@ -86,7 +87,7 @@ func newFakePeerManager() *fakePeerManager {
func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool {
return true
}
func (pm *fakePeerManager) UnregisterSession(uint64) {}
func (pm *fakePeerManager) UnregisterSession(exchange.SessionID) {}
func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}

func TestSessionGetBlocks(t *testing.T) {
Expand Down Expand Up @@ -459,6 +460,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()

session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(2)
Expand Down
7 changes: 4 additions & 3 deletions internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
exchange "github.com/ipfs/go-ipfs-exchange-interface"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -79,7 +80,7 @@ type sessionWantSender struct {
// finished shutting down
closed chan struct{}
// The session ID
sessionID uint64
sessionID exchange.SessionID
// A channel that collects incoming changes (events)
changes chan change
// Information about each want indexed by CID
Expand All @@ -102,7 +103,7 @@ type sessionWantSender struct {
onPeersExhausted onPeersExhaustedFn
}

func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager,
func newSessionWantSender(sid exchange.SessionID, pm PeerManager, spm SessionPeerManager,
bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender {

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -127,7 +128,7 @@ func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager,
return sws
}

func (sws *sessionWantSender) ID() uint64 {
func (sws *sessionWantSender) ID() exchange.SessionID {
return sws.sessionID
}

Expand Down
Loading