Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
fix: order of session broadcast wants
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Mar 12, 2020
1 parent df360b3 commit b83a609
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 42 deletions.
4 changes: 2 additions & 2 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func New(ctx context.Context,
periodicSearchDelay delay.D,
self peer.ID) *Session {
s := &Session{
sw: newSessionWants(),
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
Expand Down Expand Up @@ -433,7 +433,7 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
}

// No peers discovered yet, broadcast some want-haves
ks := s.sw.GetNextWants(broadcastLiveWantsLimit)
ks := s.sw.GetNextWants()
if len(ks) > 0 {
log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks))
s.wm.BroadcastWantHaves(ctx, s.id, ks)
Expand Down
11 changes: 9 additions & 2 deletions internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,19 @@ func TestSessionFindMorePeers(t *testing.T) {
t.Fatal("Did not make second want request ")
}

// Verify a broadcast was made
// The session should keep broadcasting periodically until it receives a response
select {
case receivedWantReq := <-fwm.wantReqs:
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list")
}
// Make sure the first block is not included because it has already
// been received
for _, c := range receivedWantReq.cids {
if c.Equals(cids[0]) {
t.Fatal("should not braodcast block that was already received")
}
}
case <-ctx.Done():
t.Fatal("Never rebroadcast want list")
}
Expand Down
78 changes: 44 additions & 34 deletions internal/session/sessionwants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,27 @@ import (
// sessionWants keeps track of which cids are waiting to be sent out, and which
// peers are "live" - ie, we've sent a request but haven't received a block yet
type sessionWants struct {
toFetch *cidQueue
liveWants map[cid.Cid]time.Time
// The wants that have not yet been sent out
toFetch *cidQueue
// Wants that have been sent but have not received a response
liveWants *cidQueue
// The time at which live wants were sent
sentAt map[cid.Cid]time.Time
// The maximum number of want-haves to send in a broadcast
broadcastLimit int
}

func newSessionWants() sessionWants {
func newSessionWants(broadcastLimit int) sessionWants {
return sessionWants{
toFetch: newCidQueue(),
liveWants: make(map[cid.Cid]time.Time),
toFetch: newCidQueue(),
liveWants: newCidQueue(),
sentAt: make(map[cid.Cid]time.Time),
broadcastLimit: broadcastLimit,
}
}

func (sw *sessionWants) String() string {
return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), len(sw.liveWants))
return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), sw.liveWants.Len())
}

// BlocksRequested is called when the client makes a request for blocks
Expand All @@ -33,20 +41,23 @@ func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) {
}
}

// GetNextWants moves as many CIDs from the fetch queue to the live wants
// list as possible (given the limit). Returns the newly live wants.
func (sw *sessionWants) GetNextWants(limit int) []cid.Cid {
// GetNextWants is called when the session has not yet discovered peers with
// the blocks that it wants. It moves as many CIDs from the fetch queue to
// the live wants queue as possible (given the broadcast limit).
// Returns the newly live wants.
func (sw *sessionWants) GetNextWants() []cid.Cid {
now := time.Now()

// Move CIDs from fetch queue to the live wants queue (up to the limit)
currentLiveCount := len(sw.liveWants)
toAdd := limit - currentLiveCount
currentLiveCount := sw.liveWants.Len()
toAdd := sw.broadcastLimit - currentLiveCount

var live []cid.Cid
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
c := sw.toFetch.Pop()
live = append(live, c)
sw.liveWants[c] = now
sw.liveWants.Push(c)
sw.sentAt[c] = now
}

return live
Expand All @@ -56,9 +67,10 @@ func (sw *sessionWants) GetNextWants(limit int) []cid.Cid {
func (sw *sessionWants) WantsSent(ks []cid.Cid) {
now := time.Now()
for _, c := range ks {
if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) {
if _, ok := sw.sentAt[c]; !ok && sw.toFetch.Has(c) {
sw.toFetch.Remove(c)
sw.liveWants[c] = now
sw.liveWants.Push(c)
sw.sentAt[c] = now
}
}
}
Expand All @@ -78,13 +90,15 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
if sw.isWanted(c) {
wanted = append(wanted, c)

sentAt, ok := sw.liveWants[c]
// Measure latency
sentAt, ok := sw.sentAt[c]
if ok && !sentAt.IsZero() {
totalLatency += now.Sub(sentAt)
}

// Remove the CID from the live wants / toFetch queue
delete(sw.liveWants, c)
sw.liveWants.Remove(c)
delete(sw.sentAt, c)
sw.toFetch.Remove(c)
}
}
Expand All @@ -93,16 +107,15 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
}

// PrepareBroadcast saves the current time for each live want and returns the
// live want CIDs.
// live want CIDs up to the broadcast limit.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
// TODO: Change this to return wants in order so that the session will
// send out Find Providers request for the first want
// (Note that maps return keys in random order)
now := time.Now()
live := make([]cid.Cid, 0, len(sw.liveWants))
for c := range sw.liveWants {
live = append(live, c)
sw.liveWants[c] = now
live := sw.liveWants.Cids()
if len(live) > sw.broadcastLimit {
live = live[:sw.broadcastLimit]
}
for _, c := range live {
sw.sentAt[c] = now
}
return live
}
Expand All @@ -116,21 +129,18 @@ func (sw *sessionWants) CancelPending(keys []cid.Cid) {

// LiveWants returns a list of live wants
func (sw *sessionWants) LiveWants() []cid.Cid {
live := make([]cid.Cid, 0, len(sw.liveWants))
for c := range sw.liveWants {
live = append(live, c)
}
return live
return sw.liveWants.Cids()
}

// RandomLiveWant returns a randomly selected live want
func (sw *sessionWants) RandomLiveWant() cid.Cid {
if len(sw.liveWants) == 0 {
if len(sw.sentAt) == 0 {
return cid.Cid{}
}

// picking a random live want
i := rand.Intn(len(sw.liveWants))
for k := range sw.liveWants {
i := rand.Intn(len(sw.sentAt))
for k := range sw.sentAt {
if i == 0 {
return k
}
Expand All @@ -141,12 +151,12 @@ func (sw *sessionWants) RandomLiveWant() cid.Cid {

// Has live wants indicates if there are any live wants
func (sw *sessionWants) HasLiveWants() bool {
return len(sw.liveWants) > 0
return sw.liveWants.Len() > 0
}

// Indicates whether the want is in either of the fetch or live queues
func (sw *sessionWants) isWanted(c cid.Cid) bool {
_, ok := sw.liveWants[c]
ok := sw.liveWants.Has(c)
if !ok {
ok = sw.toFetch.Has(c)
}
Expand Down
67 changes: 63 additions & 4 deletions internal/session/sessionwants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestEmptySessionWants(t *testing.T) {
sw := newSessionWants()
sw := newSessionWants(broadcastLiveWantsLimit)

// Expect these functions to return nothing on a new sessionWants
lws := sw.PrepareBroadcast()
Expand All @@ -29,7 +29,7 @@ func TestEmptySessionWants(t *testing.T) {
}

func TestSessionWants(t *testing.T) {
sw := newSessionWants()
sw := newSessionWants(5)
cids := testutil.GenerateCids(10)
others := testutil.GenerateCids(1)

Expand All @@ -42,7 +42,7 @@ func TestSessionWants(t *testing.T) {
// The first 5 cids should go move into the live queue
// toFetch Live
// 98765 43210
nextw := sw.GetNextWants(5)
nextw := sw.GetNextWants()
if len(nextw) != 5 {
t.Fatal("expected 5 next wants")
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestSessionWants(t *testing.T) {
// Should move 2 wants from toFetch queue to live wants
// toFetch Live
// 987__ 65432
nextw = sw.GetNextWants(5)
nextw = sw.GetNextWants()
if len(nextw) != 2 {
t.Fatal("expected 2 next wants")
}
Expand Down Expand Up @@ -108,3 +108,62 @@ func TestSessionWants(t *testing.T) {
t.Fatal("expected 4 live wants")
}
}

func TestPrepareBroadcast(t *testing.T) {
sw := newSessionWants(3)
cids := testutil.GenerateCids(10)

// Add 6 new wants
// toFetch Live
// 543210
sw.BlocksRequested(cids[0:6])

// Get next wants with a limit of 3
// The first 3 cids should go move into the live queue
// toFetch Live
// 543 210
sw.GetNextWants()

// Broadcast should contain wants in order
for i := 0; i < 10; i++ {
ws := sw.PrepareBroadcast()
if len(ws) != 3 {
t.Fatal("should broadcast all live wants")
}
for idx, c := range ws {
if !c.Equals(cids[idx]) {
t.Fatal("broadcast should always return wants in order")
}
}
}

// One block received
// Remove a cid from the live queue
sw.BlocksReceived(cids[0:1])
// toFetch Live
// 543 21_

// Add 4 new wants
// toFetch Live
// 9876543 21
sw.BlocksRequested(cids[6:])

// 2 Wants sent
// toFetch Live
// 98765 4321
sw.WantsSent(cids[3:5])

// Broadcast should contain wants in order
cids = cids[1:]
for i := 0; i < 10; i++ {
ws := sw.PrepareBroadcast()
if len(ws) != 3 {
t.Fatal("should broadcast live wants up to limit", len(ws), len(cids))
}
for idx, c := range ws {
if !c.Equals(cids[idx]) {
t.Fatal("broadcast should always return wants in order")
}
}
}
}

0 comments on commit b83a609

Please sign in to comment.