This repository has been archived by the owner on Feb 1, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 112
/
messagequeue.go
488 lines (410 loc) · 13 KB
/
messagequeue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
package messagequeue
import (
"context"
"math"
"sync"
"time"
debounce "github.com/bep/debounce"
bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
bsnet "github.com/ipfs/go-bitswap/network"
bswl "github.com/ipfs/go-bitswap/wantlist"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
)
var log = logging.Logger("bitswap")
const (
defaultRebroadcastInterval = 30 * time.Second
// maxRetries is the number of times to attempt to send a message before
// giving up
maxRetries = 10
// maxMessageSize is the maximum message size in bytes
maxMessageSize = 1024 * 1024 * 2
// sendErrorBackoff is the time to wait before retrying to connect after
// an error when trying to send a message
sendErrorBackoff = 100 * time.Millisecond
// maxPriority is the max priority as defined by the bitswap protocol
maxPriority = math.MaxInt32
// sendMessageDebounce is the debounce duration when calling sendMessage()
sendMessageDebounce = time.Millisecond
)
// MessageNetwork is any network that can connect peers and generate a message
// sender.
type MessageNetwork interface {
ConnectTo(context.Context, peer.ID) error
NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
Self() peer.ID
}
// MessageQueue implements queue of want messages to send to peers.
type MessageQueue struct {
ctx context.Context
p peer.ID
network MessageNetwork
maxMessageSize int
sendErrorBackoff time.Duration
signalWorkReady func()
outgoingWork chan struct{}
done chan struct{}
// Take lock whenever any of these variables are modified
wllock sync.Mutex
bcstWants recallWantlist
peerWants recallWantlist
cancels *cid.Set
priority int
// Dont touch any of these variables outside of run loop
sender bsnet.MessageSender
rebroadcastIntervalLk sync.RWMutex
rebroadcastInterval time.Duration
rebroadcastTimer *time.Timer
}
// recallWantlist keeps a list of pending wants, and a list of all wants that
// have ever been requested
type recallWantlist struct {
// The list of all wants that have been requested, including wants that
// have been sent and wants that have not yet been sent
allWants *bswl.Wantlist
// The list of wants that have not yet been sent
pending *bswl.Wantlist
}
func newRecallWantList() recallWantlist {
return recallWantlist{
allWants: bswl.New(),
pending: bswl.New(),
}
}
// Add want to both the pending list and the list of all wants
func (r *recallWantlist) Add(c cid.Cid, priority int, wtype pb.Message_Wantlist_WantType) {
r.allWants.Add(c, priority, wtype)
r.pending.Add(c, priority, wtype)
}
// Remove wants from both the pending list and the list of all wants
func (r *recallWantlist) Remove(c cid.Cid) {
r.allWants.Remove(c)
r.pending.Remove(c)
}
// Remove wants by type from both the pending list and the list of all wants
func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantType) {
r.allWants.RemoveType(c, wtype)
r.pending.RemoveType(c, wtype)
}
// New creats a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff)
}
// This constructor is used by the tests
func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, maxMsgSize int, sendErrorBackoff time.Duration) *MessageQueue {
mq := &MessageQueue{
ctx: ctx,
p: p,
network: network,
maxMessageSize: maxMsgSize,
bcstWants: newRecallWantList(),
peerWants: newRecallWantList(),
cancels: cid.NewSet(),
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
priority: maxPriority,
}
// Apply debounce to the work ready signal (which triggers sending a message)
debounced := debounce.New(sendMessageDebounce)
mq.signalWorkReady = func() { debounced(mq.onWorkReady) }
return mq
}
// Add want-haves that are part of a broadcast to all connected peers
func (mq *MessageQueue) AddBroadcastWantHaves(wantHaves []cid.Cid) {
if len(wantHaves) == 0 {
return
}
mq.wllock.Lock()
defer mq.wllock.Unlock()
for _, c := range wantHaves {
mq.bcstWants.Add(c, mq.priority, pb.Message_Wantlist_Have)
mq.priority--
// We're adding a want-have for the cid, so clear any pending cancel
// for the cid
mq.cancels.Remove(c)
}
// Schedule a message send
mq.signalWorkReady()
}
// Add want-haves and want-blocks for the peer for this message queue.
func (mq *MessageQueue) AddWants(wantBlocks []cid.Cid, wantHaves []cid.Cid) {
if len(wantBlocks) == 0 && len(wantHaves) == 0 {
return
}
mq.wllock.Lock()
defer mq.wllock.Unlock()
for _, c := range wantHaves {
mq.peerWants.Add(c, mq.priority, pb.Message_Wantlist_Have)
mq.priority--
// We're adding a want-have for the cid, so clear any pending cancel
// for the cid
mq.cancels.Remove(c)
}
for _, c := range wantBlocks {
mq.peerWants.Add(c, mq.priority, pb.Message_Wantlist_Block)
mq.priority--
// We're adding a want-block for the cid, so clear any pending cancel
// for the cid
mq.cancels.Remove(c)
}
// Schedule a message send
mq.signalWorkReady()
}
// Add cancel messages for the given keys.
func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
if len(cancelKs) == 0 {
return
}
mq.wllock.Lock()
defer mq.wllock.Unlock()
for _, c := range cancelKs {
mq.bcstWants.Remove(c)
mq.peerWants.Remove(c)
mq.cancels.Add(c)
}
// Schedule a message send
mq.signalWorkReady()
}
// SetRebroadcastInterval sets a new interval on which to rebroadcast the full wantlist
func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {
mq.rebroadcastIntervalLk.Lock()
mq.rebroadcastInterval = delay
if mq.rebroadcastTimer != nil {
mq.rebroadcastTimer.Reset(delay)
}
mq.rebroadcastIntervalLk.Unlock()
}
// Startup starts the processing of messages and rebroadcasting.
func (mq *MessageQueue) Startup() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastTimer = time.NewTimer(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
go mq.runQueue()
}
// Shutdown stops the processing of messages for a message queue.
func (mq *MessageQueue) Shutdown() {
close(mq.done)
}
func (mq *MessageQueue) runQueue() {
for {
select {
case <-mq.rebroadcastTimer.C:
mq.rebroadcastWantlist()
case <-mq.outgoingWork:
mq.sendIfReady()
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
}
return
case <-mq.ctx.Done():
if mq.sender != nil {
_ = mq.sender.Reset()
}
return
}
}
}
// Periodically resend the list of wants to the peer
func (mq *MessageQueue) rebroadcastWantlist() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastTimer.Reset(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
// If some wants were transferred from the rebroadcast list
if mq.transferRebroadcastWants() {
// Send them out
mq.sendMessage()
}
}
// Transfer wants from the rebroadcast lists into the pending lists.
func (mq *MessageQueue) transferRebroadcastWants() bool {
mq.wllock.Lock()
defer mq.wllock.Unlock()
// Check if there are any wants to rebroadcast
if mq.bcstWants.allWants.Len() == 0 && mq.peerWants.allWants.Len() == 0 {
return false
}
// Copy all wants into pending wants lists
mq.bcstWants.pending.Absorb(mq.bcstWants.allWants)
mq.peerWants.pending.Absorb(mq.peerWants.allWants)
return true
}
func (mq *MessageQueue) onWorkReady() {
select {
case mq.outgoingWork <- struct{}{}:
default:
}
}
func (mq *MessageQueue) sendIfReady() {
if mq.hasPendingWork() {
mq.sendMessage()
}
}
func (mq *MessageQueue) sendMessage() {
err := mq.initializeSender()
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
// TODO: should we stop using this connection and clear the want list
// to avoid using up memory?
return
}
// Convert want lists to a Bitswap Message
message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
if message == nil || message.Empty() {
return
}
// mq.logOutgoingMessage(message)
// Try to send this message repeatedly
for i := 0; i < maxRetries; i++ {
if mq.attemptSendAndRecovery(message) {
// We were able to send successfully.
onSent()
// If the message was too big and only a subset of wants could be
// sent, schedule sending the rest of the wants in the next
// iteration of the event loop.
if mq.hasPendingWork() {
mq.signalWorkReady()
}
return
}
}
}
// func (mq *MessageQueue) logOutgoingMessage(msg bsmsg.BitSwapMessage) {
// entries := msg.Wantlist()
// for _, e := range entries {
// if e.Cancel {
// if e.WantType == pb.Message_Wantlist_Have {
// log.Debugf("send %s->%s: cancel-have %s\n", lu.P(mq.network.Self()), lu.P(mq.p), lu.C(e.Cid))
// } else {
// log.Debugf("send %s->%s: cancel-block %s\n", lu.P(mq.network.Self()), lu.P(mq.p), lu.C(e.Cid))
// }
// } else {
// if e.WantType == pb.Message_Wantlist_Have {
// log.Debugf("send %s->%s: want-have %s\n", lu.P(mq.network.Self()), lu.P(mq.p), lu.C(e.Cid))
// } else {
// log.Debugf("send %s->%s: want-block %s\n", lu.P(mq.network.Self()), lu.P(mq.p), lu.C(e.Cid))
// }
// }
// }
// }
func (mq *MessageQueue) hasPendingWork() bool {
mq.wllock.Lock()
defer mq.wllock.Unlock()
return mq.bcstWants.pending.Len() > 0 || mq.peerWants.pending.Len() > 0 || mq.cancels.Len() > 0
}
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
// Create a new message
msg := bsmsg.New(false)
mq.wllock.Lock()
defer mq.wllock.Unlock()
// Get broadcast and regular wantlist entries
bcstEntries := mq.bcstWants.pending.SortedEntries()
peerEntries := mq.peerWants.pending.SortedEntries()
// Size of the message so far
msgSize := 0
// Add each broadcast want-have to the message
for i := 0; i < len(bcstEntries) && msgSize < mq.maxMessageSize; i++ {
// Broadcast wants are sent as want-have
wantType := pb.Message_Wantlist_Have
// If the remote peer doesn't support HAVE / DONT_HAVE messages,
// send a want-block instead
if !supportsHave {
wantType = pb.Message_Wantlist_Block
}
e := bcstEntries[i]
msgSize += msg.AddEntry(e.Cid, e.Priority, wantType, false)
}
// Add each regular want-have / want-block to the message
for i := 0; i < len(peerEntries) && msgSize < mq.maxMessageSize; i++ {
e := peerEntries[i]
// If the remote peer doesn't support HAVE / DONT_HAVE messages,
// don't send want-haves (only send want-blocks)
if !supportsHave && e.WantType == pb.Message_Wantlist_Have {
mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have)
} else {
msgSize += msg.AddEntry(e.Cid, e.Priority, e.WantType, true)
}
}
// Add each cancel to the message
cancels := mq.cancels.Keys()
for i := 0; i < len(cancels) && msgSize < mq.maxMessageSize; i++ {
c := cancels[i]
msgSize += msg.Cancel(c)
// Clear the cancel - we make a best effort to let peers know about
// cancels but won't save them to resend if there's a failure.
mq.cancels.Remove(c)
}
// Called when the message has been successfully sent.
// Remove the sent keys from the broadcast and regular wantlists.
onSent := func() {
mq.wllock.Lock()
defer mq.wllock.Unlock()
for _, e := range msg.Wantlist() {
mq.bcstWants.pending.Remove(e.Cid)
mq.peerWants.pending.RemoveType(e.Cid, e.WantType)
}
}
return msg, onSent
}
func (mq *MessageQueue) initializeSender() error {
if mq.sender != nil {
return nil
}
nsender, err := openSender(mq.ctx, mq.network, mq.p)
if err != nil {
return err
}
mq.sender = nsender
return nil
}
func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) bool {
err := mq.sender.SendMsg(mq.ctx, message)
if err == nil {
return true
}
log.Infof("bitswap send error: %s", err)
_ = mq.sender.Reset()
mq.sender = nil
select {
case <-mq.done:
return true
case <-mq.ctx.Done():
return true
case <-time.After(mq.sendErrorBackoff):
// wait 100ms in case disconnect notifications are still propagating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}
err = mq.initializeSender()
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
return true
}
// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
}
*/
return false
}
func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()
err := network.ConnectTo(conctx, p)
if err != nil {
return nil, err
}
nsender, err := network.NewMessageSender(ctx, p)
if err != nil {
return nil, err
}
return nsender, nil
}