This repository has been archived by the owner on Feb 1, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 112
/
Copy pathpeermanager.go
137 lines (111 loc) · 3.17 KB
/
peermanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package peermanager
import (
"context"
"sync"
bsmsg "github.com/ipfs/go-bitswap/message"
wantlist "github.com/ipfs/go-bitswap/wantlist"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)
var log = logging.Logger("bitswap")
var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)
// PeerQueue provides a queer of messages to be sent for a single peer.
type PeerQueue interface {
AddMessage(entries []bsmsg.Entry, ses uint64)
Startup()
AddWantlist(initialWants *wantlist.SessionTrackedWantlist)
Shutdown()
}
// PeerQueueFactory provides a function that will create a PeerQueue.
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue
type peerMessage interface {
handle(pm *PeerManager)
}
type peerQueueInstance struct {
refcnt int
pq PeerQueue
}
// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
// peerQueues -- interact through internal utility functions get/set/remove/iterate
peerQueues map[peer.ID]*peerQueueInstance
peerQueuesLk sync.RWMutex
createPeerQueue PeerQueueFactory
ctx context.Context
}
// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
return &PeerManager{
peerQueues: make(map[peer.ID]*peerQueueInstance),
createPeerQueue: createPeerQueue,
ctx: ctx,
}
}
// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
pm.peerQueuesLk.RLock()
defer pm.peerQueuesLk.RUnlock()
peers := make([]peer.ID, 0, len(pm.peerQueues))
for p := range pm.peerQueues {
peers = append(peers, p)
}
return peers
}
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialWants *wantlist.SessionTrackedWantlist) {
pm.peerQueuesLk.Lock()
pq := pm.getOrCreate(p)
if pq.refcnt == 0 {
pq.pq.AddWantlist(initialWants)
}
pq.refcnt++
pm.peerQueuesLk.Unlock()
}
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.peerQueuesLk.Lock()
pq, ok := pm.peerQueues[p]
if !ok {
pm.peerQueuesLk.Unlock()
return
}
pq.refcnt--
if pq.refcnt > 0 {
pm.peerQueuesLk.Unlock()
return
}
delete(pm.peerQueues, p)
pm.peerQueuesLk.Unlock()
pq.pq.Shutdown()
}
// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
func (pm *PeerManager) SendMessage(entries []bsmsg.Entry, targets []peer.ID, from uint64) {
if len(targets) == 0 {
pm.peerQueuesLk.RLock()
for _, p := range pm.peerQueues {
p.pq.AddMessage(entries, from)
}
pm.peerQueuesLk.RUnlock()
} else {
for _, t := range targets {
pm.peerQueuesLk.Lock()
pqi := pm.getOrCreate(t)
pm.peerQueuesLk.Unlock()
pqi.pq.AddMessage(entries, from)
}
}
}
func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance {
pqi, ok := pm.peerQueues[p]
if !ok {
pq := pm.createPeerQueue(pm.ctx, p)
pq.Startup()
pqi = &peerQueueInstance{0, pq}
pm.peerQueues[p] = pqi
}
return pqi
}