Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

stream: remove semaphor hack #2096

Merged
merged 1 commit into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 18 additions & 50 deletions network/stream/cursors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ func TestNodesCorrectBinsDynamic(t *testing.T) {
nodeCount = 6
chunkCount = 500
)
binZeroPeers := 0

opts := &SyncSimServiceOptions{
InitialChunkCount: chunkCount,
}
Expand All @@ -114,69 +112,53 @@ func TestNodesCorrectBinsDynamic(t *testing.T) {
}, false)
defer sim.Close()

idPivot, err := sim.AddNode()
_, err := sim.AddNodesAndConnectStar(2)
if err != nil {
t.Fatal(err)
}

if len(sim.UpNodeIDs()) != 1 {
t.Fatal("node not started")
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != 2 {
t.Fatal("not enough nodes up")
}

pivotKademlia := nodeKademlia(sim, idPivot)
pivotSyncer := nodeRegistry(sim, idPivot)
waitForCursors(t, sim, nodeIDs[0], nodeIDs[1], true)
waitForCursors(t, sim, nodeIDs[1], nodeIDs[0], true)

for j := 1; j <= nodeCount; j++ {
for j := 2; j <= nodeCount; j++ {
// append a node to the simulation
id, err := sim.AddNode()
id, err := sim.AddNodes(1)
if err != nil {
t.Fatal(err)
}
err = sim.Net.ConnectNodesStar([]enode.ID{id}, idPivot)
err = sim.Net.ConnectNodesStar(id, nodeIDs[0])
if err != nil {
t.Fatal(err)
}
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != j+1 {
t.Fatalf("not enough nodes up. got %d, want %d", len(nodeIDs), j+1)
}
idPivot := nodeIDs[0]

otherKad := sim.MustNodeItem(id, simulation.BucketKeyKademlia).(*network.Kademlia)
po := chunk.Proximity(otherKad.BaseAddr(), pivotKademlia.BaseAddr())
if po == 0 {
binZeroPeers++
if binZeroPeers <= maxBinZeroSyncPeers {
waitForCursors(t, sim, idPivot, nodeIDs[j], true)
waitForCursors(t, sim, nodeIDs[j], idPivot, true)
} else {
// wait for the peer to get created in the protocol
waitForPeer(t, sim, idPivot, id)
}
} else {
waitForCursors(t, sim, idPivot, nodeIDs[j], true)
waitForCursors(t, sim, nodeIDs[j], idPivot, true)
}
waitForCursors(t, sim, idPivot, nodeIDs[j], true)
waitForCursors(t, sim, nodeIDs[j], idPivot, true)

binZeroRun := 0
for i := 1; i < len(nodeIDs); i++ {
pivotSyncer := nodeRegistry(sim, idPivot)
pivotKademlia := nodeKademlia(sim, idPivot)
pivotDepth := uint(pivotKademlia.NeighbourhoodDepth())

for i := 1; i < j; i++ {
idOther := nodeIDs[i]
otherKademlia := sim.MustNodeItem(idOther, simulation.BucketKeyKademlia).(*network.Kademlia)

po := chunk.Proximity(otherKademlia.BaseAddr(), pivotKademlia.BaseAddr())
pivotCursors := pivotSyncer.getPeer(idOther).getCursorsCopy()
pivotDepth := uint(pivotKademlia.NeighbourhoodDepth())
if po == 0 {
binZeroRun++
if binZeroRun > maxBinZeroSyncPeers {
continue
}
}

// check that the pivot node is interested just in bins >= depth
if po >= int(pivotDepth) {
othersBins := nodeInitialBinIndexes(sim, idOther)
if err := compareNodeBinsToStreamsWithDepth(t, pivotCursors, othersBins, pivotDepth); err != nil {
t.Error(i, j, po, err)
t.Error(err)
}
}
}
Expand Down Expand Up @@ -372,20 +354,6 @@ func setupReestablishCursorsSimulation(t *testing.T, tagetPO int) (sim *simulati
return
}

func waitForPeer(t *testing.T, sim *simulation.Simulation, pivotEnode, lookupEnode enode.ID) {
for i := 0; i < 1000; i++ { // 10s total wait
time.Sleep(5 * time.Millisecond)
s, ok := sim.Service(serviceNameStream, pivotEnode).(*Registry)
if !ok {
continue
}
p := s.getPeer(lookupEnode)
if p == nil {
continue
}
}
}

// waitForCursors checks if the pivot node has some cursors or not
// by periodically checking for them.
func waitForCursors(t *testing.T, sim *simulation.Simulation, pivotEnode, lookupEnode enode.ID, wantSome bool) {
Expand Down
22 changes: 3 additions & 19 deletions network/stream/sync_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ import (
)

const (
syncStreamName = "SYNC"
cacheCapacity = 10000
setCacheCapacity = 80000 // 80000 * 32 = ~2.5mb mem footprint, 80K chunks ~=330 megs of data
maxBinZeroSyncPeers = 3
syncStreamName = "SYNC"
cacheCapacity = 10000
setCacheCapacity = 80000 // 80000 * 32 = ~2.5mb mem footprint, 80K chunks ~=330 megs of data
)

var (
Expand All @@ -59,7 +58,6 @@ type syncProvider struct {
setCacheMtx sync.RWMutex // set cache mutex
setCache *lru.Cache // cache to reduce load on localstore to not set the same chunk as synced
logger log.Logger // logger that appends the base address to loglines
binZeroSem chan struct{} // semaphore to limit number of syncing peers on bin 0
}

// NewSyncProvider creates a new sync provider that is used by the stream protocol to sink data and control its behaviour
Expand All @@ -86,7 +84,6 @@ func NewSyncProvider(ns *storage.NetStore, kad *network.Kademlia, baseAddr *netw
cache: c,
setCache: sc,
logger: log.NewBaseAddressLogger(baseAddr.ShortString()),
binZeroSem: make(chan struct{}, maxBinZeroSyncPeers),
}
}

Expand Down Expand Up @@ -317,24 +314,11 @@ func (s *syncProvider) InitPeer(p *Peer) {
case <-timer.C:
case <-p.quit:
return
case <-s.quit:
return
}

po := chunk.Proximity(p.BzzAddr.Over(), s.kad.BaseAddr())
depth := s.kad.NeighbourhoodDepth()

if po == 0 {
select {
case s.binZeroSem <- struct{}{}:
case <-p.quit:
return
case <-s.quit:
return
}
defer func() { <-s.binZeroSem }()

}
p.logger.Debug("update syncing subscriptions: initial", "po", po, "depth", depth)

subBins, quitBins := syncSubscriptionsDiff(po, -1, depth, s.kad.MaxProxDisplay, s.syncBinsOnlyWithinDepth)
Expand Down