Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #291 from ipfs/fix/session-broadcast-wants
Browse files Browse the repository at this point in the history
Fix order of session broadcast wants
  • Loading branch information
Stebalien authored Mar 12, 2020
2 parents 5c18cf5 + 73261ec commit cd14e70
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 24 deletions.
4 changes: 2 additions & 2 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func New(ctx context.Context,
periodicSearchDelay delay.D,
self peer.ID) *Session {
s := &Session{
sw: newSessionWants(),
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
Expand Down Expand Up @@ -433,7 +433,7 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
}

// No peers discovered yet, broadcast some want-haves
ks := s.sw.GetNextWants(broadcastLiveWantsLimit)
ks := s.sw.GetNextWants()
if len(ks) > 0 {
log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks))
s.wm.BroadcastWantHaves(ctx, s.id, ks)
Expand Down
11 changes: 9 additions & 2 deletions internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,19 @@ func TestSessionFindMorePeers(t *testing.T) {
t.Fatal("Did not make second want request ")
}

// Verify a broadcast was made
// The session should keep broadcasting periodically until it receives a response
select {
case receivedWantReq := <-fwm.wantReqs:
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list")
}
// Make sure the first block is not included because it has already
// been received
for _, c := range receivedWantReq.cids {
if c.Equals(cids[0]) {
t.Fatal("should not braodcast block that was already received")
}
}
case <-ctx.Done():
t.Fatal("Never rebroadcast want list")
}
Expand Down
71 changes: 55 additions & 16 deletions internal/session/sessionwants.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,29 @@ import (
cid "github.com/ipfs/go-cid"
)

// liveWantsOrder and liveWants will get out of sync as blocks are received.
// This constant is the maximum amount to allow them to be out of sync before
// cleaning up the ordering array.
const liveWantsOrderGCLimit = 32

// sessionWants keeps track of which cids are waiting to be sent out, and which
// peers are "live" - ie, we've sent a request but haven't received a block yet
type sessionWants struct {
toFetch *cidQueue
// The wants that have not yet been sent out
toFetch *cidQueue
// Wants that have been sent but have not received a response
liveWants map[cid.Cid]time.Time
// The order in which wants were requested
liveWantsOrder []cid.Cid
// The maximum number of want-haves to send in a broadcast
broadcastLimit int
}

func newSessionWants() sessionWants {
func newSessionWants(broadcastLimit int) sessionWants {
return sessionWants{
toFetch: newCidQueue(),
liveWants: make(map[cid.Cid]time.Time),
toFetch: newCidQueue(),
liveWants: make(map[cid.Cid]time.Time),
broadcastLimit: broadcastLimit,
}
}

Expand All @@ -33,19 +45,23 @@ func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) {
}
}

// GetNextWants moves as many CIDs from the fetch queue to the live wants
// list as possible (given the limit). Returns the newly live wants.
func (sw *sessionWants) GetNextWants(limit int) []cid.Cid {
// GetNextWants is called when the session has not yet discovered peers with
// the blocks that it wants. It moves as many CIDs from the fetch queue to
// the live wants queue as possible (given the broadcast limit).
// Returns the newly live wants.
func (sw *sessionWants) GetNextWants() []cid.Cid {
now := time.Now()

// Move CIDs from fetch queue to the live wants queue (up to the limit)
// Move CIDs from fetch queue to the live wants queue (up to the broadcast
// limit)
currentLiveCount := len(sw.liveWants)
toAdd := limit - currentLiveCount
toAdd := sw.broadcastLimit - currentLiveCount

var live []cid.Cid
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
c := sw.toFetch.Pop()
live = append(live, c)
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
}

Expand All @@ -58,6 +74,7 @@ func (sw *sessionWants) WantsSent(ks []cid.Cid) {
for _, c := range ks {
if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) {
sw.toFetch.Remove(c)
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
}
}
Expand All @@ -73,11 +90,13 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
return wanted, totalLatency
}

// Filter for blocks that were actually wanted (as opposed to duplicates)
now := time.Now()
for _, c := range ks {
if sw.isWanted(c) {
wanted = append(wanted, c)

// Measure latency
sentAt, ok := sw.liveWants[c]
if ok && !sentAt.IsZero() {
totalLatency += now.Sub(sentAt)
Expand All @@ -89,21 +108,39 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
}
}

// If the live wants ordering array is a long way out of sync with the
// live wants map, clean up the ordering array
if len(sw.liveWantsOrder)-len(sw.liveWants) > liveWantsOrderGCLimit {
cleaned := sw.liveWantsOrder[:0]
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
cleaned = append(cleaned, c)
}
}
sw.liveWantsOrder = cleaned
}

return wanted, totalLatency
}

// PrepareBroadcast saves the current time for each live want and returns the
// live want CIDs.
// live want CIDs up to the broadcast limit.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
// TODO: Change this to return wants in order so that the session will
// send out Find Providers request for the first want
// (Note that maps return keys in random order)
now := time.Now()
live := make([]cid.Cid, 0, len(sw.liveWants))
for c := range sw.liveWants {
live = append(live, c)
sw.liveWants[c] = now
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
// No response was received for the want, so reset the sent time
// to now as we're about to broadcast
sw.liveWants[c] = now

live = append(live, c)
if len(live) == sw.broadcastLimit {
break
}
}
}

return live
}

Expand All @@ -120,9 +157,11 @@ func (sw *sessionWants) LiveWants() []cid.Cid {
for c := range sw.liveWants {
live = append(live, c)
}

return live
}

// RandomLiveWant returns a randomly selected live want
func (sw *sessionWants) RandomLiveWant() cid.Cid {
if len(sw.liveWants) == 0 {
return cid.Cid{}
Expand Down
87 changes: 83 additions & 4 deletions internal/session/sessionwants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestEmptySessionWants(t *testing.T) {
sw := newSessionWants()
sw := newSessionWants(broadcastLiveWantsLimit)

// Expect these functions to return nothing on a new sessionWants
lws := sw.PrepareBroadcast()
Expand All @@ -29,7 +29,7 @@ func TestEmptySessionWants(t *testing.T) {
}

func TestSessionWants(t *testing.T) {
sw := newSessionWants()
sw := newSessionWants(5)
cids := testutil.GenerateCids(10)
others := testutil.GenerateCids(1)

Expand All @@ -42,7 +42,7 @@ func TestSessionWants(t *testing.T) {
// The first 5 cids should go move into the live queue
// toFetch Live
// 98765 43210
nextw := sw.GetNextWants(5)
nextw := sw.GetNextWants()
if len(nextw) != 5 {
t.Fatal("expected 5 next wants")
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestSessionWants(t *testing.T) {
// Should move 2 wants from toFetch queue to live wants
// toFetch Live
// 987__ 65432
nextw = sw.GetNextWants(5)
nextw = sw.GetNextWants()
if len(nextw) != 2 {
t.Fatal("expected 2 next wants")
}
Expand Down Expand Up @@ -108,3 +108,82 @@ func TestSessionWants(t *testing.T) {
t.Fatal("expected 4 live wants")
}
}

func TestPrepareBroadcast(t *testing.T) {
sw := newSessionWants(3)
cids := testutil.GenerateCids(10)

// Add 6 new wants
// toFetch Live
// 543210
sw.BlocksRequested(cids[:6])

// Get next wants with a limit of 3
// The first 3 cids should go move into the live queue
// toFetch Live
// 543 210
sw.GetNextWants()

// Broadcast should contain wants in order
for i := 0; i < 10; i++ {
ws := sw.PrepareBroadcast()
if len(ws) != 3 {
t.Fatal("should broadcast all live wants")
}
for idx, c := range ws {
if !c.Equals(cids[idx]) {
t.Fatal("broadcast should always return wants in order")
}
}
}

// One block received
// Remove a cid from the live queue
sw.BlocksReceived(cids[:1])
// toFetch Live
// 543 21_

// Add 4 new wants
// toFetch Live
// 9876543 21
sw.BlocksRequested(cids[6:])

// 2 Wants sent
// toFetch Live
// 98765 4321
sw.WantsSent(cids[3:5])

// Broadcast should contain wants in order
cids = cids[1:]
for i := 0; i < 10; i++ {
ws := sw.PrepareBroadcast()
if len(ws) != 3 {
t.Fatal("should broadcast live wants up to limit", len(ws), len(cids))
}
for idx, c := range ws {
if !c.Equals(cids[idx]) {
t.Fatal("broadcast should always return wants in order")
}
}
}
}

// Test that even after GC broadcast returns correct wants
func TestPrepareBroadcastAfterGC(t *testing.T) {
sw := newSessionWants(5)
cids := testutil.GenerateCids(liveWantsOrderGCLimit * 2)

sw.BlocksRequested(cids)

// Trigger a sessionWants internal GC of the live wants
sw.BlocksReceived(cids[:liveWantsOrderGCLimit+1])
cids = cids[:liveWantsOrderGCLimit+1]

// Broadcast should contain wants in order
ws := sw.PrepareBroadcast()
for i, c := range ws {
if !c.Equals(cids[i]) {
t.Fatal("broadcast should always return wants in order")
}
}
}

0 comments on commit cd14e70

Please sign in to comment.