Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Use a realistic test network w/ DHT #136

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 122 additions & 24 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/ipfs/go-bitswap/testutil"
blocks "github.com/ipfs/go-block-format"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"

bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/session"
Expand All @@ -24,6 +25,8 @@ import (
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)

type topologyFunc func(b *testing.B, instances []testinstance.Instance)

type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)

type distFunc func(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block)
Expand Down Expand Up @@ -152,12 +155,49 @@ func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666)
}

func BenchmarkDupsManyNodesRealWorldNetworkWithRealDHT(b *testing.B) {
benchmarkLog = nil
benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64)
var randomGen *rand.Rand = nil
if err == nil {
randomGen = rand.New(rand.NewSource(benchmarkSeed))
}

fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
0.0, 0.0, distribution, randomGen)
fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
fastBandwidthGenerator := tn.VariableRateLimitGenerator(fastBandwidth, fastBandwidthDeviation, randomGen)
averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
0.3, 0.3, distribution, randomGen)
averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
averageBandwidthGenerator := tn.VariableRateLimitGenerator(mediumBandwidth, mediumBandwidthDeviation, randomGen)
slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
0.3, 0.3, distribution, randomGen)
slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)
slowBandwidthGenerator := tn.VariableRateLimitGenerator(slowBandwidth, slowBandwidthDeviation, randomGen)

b.Run("10Nodes-RandomRandom-BigBatch-FastNetwork", func(b *testing.B) {
subtestDistributeAndFetchWithRealDHT(b, 20, 10, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, starPattern, randomRandom, batchFetchAll)
})
b.Run("10Nodes-RandomRandom-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetchWithRealDHT(b, 20, 10, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, starPattern, randomRandom, batchFetchAll)
})
b.Run("10Nodes-RandomRandom-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetchWithRealDHT(b, 20, 10, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, starPattern, randomRandom, batchFetchAll)
})
out, _ := json.MarshalIndent(benchmarkLog, "", " ")
ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666)
}

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
start := time.Now()
net := tn.VirtualNetwork(mockrouting.NewServer(), d)

ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false))
defer ig.Close()

bg := blocksutil.NewBlockGenerator()
Expand All @@ -174,7 +214,7 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d
start := time.Now()
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false))
defer ig.Close()

instances := ig.Instances(numnodes)
Expand All @@ -184,6 +224,45 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d
}
}

func subtestDistributeAndFetchWithRealDHT(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, tf topologyFunc, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
start := time.Now()
ctx := context.Background()
mn := mocknet.New(ctx)
routing := tn.NewDHTServer()
net, err := tn.StreamNet(ctx, mn, routing)
if err != nil {
b.Fatal("Unable to initialize mocknet")
}

ig := testinstance.NewTestInstanceGenerator(net)
defer ig.Close()

var instances []testinstance.Instance
for j := 0; j < numnodes; j++ {
inst := ig.Next()
instances = append(instances, inst)
}
mn.LinkAll()
for i, inst := range instances {
// rate limit and delate connections
for j := i + 1; j < len(instances); j++ {
oinst := instances[j]
links := mn.LinksBetweenPeers(inst.Peer, oinst.Peer)
for _, link := range links {
link.SetOptions(mocknet.LinkOptions{Latency: d.NextWaitTime(), Bandwidth: rateLimitGenerator.NextRateLimit()})
}
}
}

tf(b, instances)

blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)

runDistribution(b, instances, blocks, df, ff, start)
}
}

func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {

numnodes := len(instances)
Expand Down Expand Up @@ -216,13 +295,25 @@ func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []b
b.Logf("send/recv: %d / %d", nst.MessagesSent, nst.MessagesRecvd)
}

func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
for _, p := range provs {
if err := p.Blockstore().PutMany(blocks); err != nil {
func starPattern(b *testing.B, instances []testinstance.Instance) {
for _, inst := range instances {
// Connect in a star pattern to first node only (like a bootstrap)
inst.Adapter.ConnectTo(context.Background(), instances[0].Peer)
}
}

func putMany(b *testing.B, p testinstance.Instance, blocks []blocks.Block) {
for _, blk := range blocks {
if err := p.Exchange.HasBlock(blk); err != nil {
b.Fatal(err)
}
}
}
func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
for _, p := range provs {
putMany(b, p, blocks)
}
}

// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
Expand All @@ -233,12 +324,8 @@ func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
bill := provs[0]
jeff := provs[1]

if err := bill.Blockstore().PutMany(blks[:75]); err != nil {
b.Fatal(err)
}
if err := jeff.Blockstore().PutMany(blks[25:]); err != nil {
b.Fatal(err)
}
putMany(b, bill, blks[:75])
putMany(b, jeff, blks[25:])
}

// overlap2 gives every even numbered block to the first peer, odd numbered
Expand All @@ -250,16 +337,16 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
bill := provs[0]
jeff := provs[1]

bill.Blockstore().Put(blks[0])
jeff.Blockstore().Put(blks[0])
bill.Exchange.HasBlock(blks[0])
jeff.Exchange.HasBlock(blks[0])
for i, blk := range blks {
if i%3 == 0 {
bill.Blockstore().Put(blk)
jeff.Blockstore().Put(blk)
bill.Exchange.HasBlock(blk)
jeff.Exchange.HasBlock(blk)
} else if i%2 == 1 {
bill.Blockstore().Put(blk)
bill.Exchange.HasBlock(blk)
} else {
jeff.Blockstore().Put(blk)
jeff.Exchange.HasBlock(blk)
}
}
}
Expand All @@ -272,16 +359,16 @@ func overlap3(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
bill := provs[0]
jeff := provs[1]

bill.Blockstore().Put(blks[0])
jeff.Blockstore().Put(blks[0])
bill.Exchange.HasBlock(blks[0])
jeff.Exchange.HasBlock(blks[0])
for i, blk := range blks {
if i%3 == 0 {
bill.Blockstore().Put(blk)
jeff.Blockstore().Put(blk)
bill.Exchange.HasBlock(blk)
jeff.Exchange.HasBlock(blk)
} else if i%2 == 1 {
bill.Blockstore().Put(blk)
bill.Exchange.HasBlock(blk)
} else {
jeff.Blockstore().Put(blk)
jeff.Exchange.HasBlock(blk)
}
}
}
Expand All @@ -291,7 +378,18 @@ func overlap3(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
// but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
for _, blk := range blks {
provs[rand.Intn(len(provs))].Blockstore().Put(blk)
provs[rand.Intn(len(provs))].Exchange.HasBlock(blk)
}
}

// randomRandom distributes each block randomly -- to random peers a random amount of times
func randomRandom(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
for _, blk := range blks {
dups := rand.Intn(len(provs)) + 1
perm := rand.Perm(len(provs))
for i := 0; i < dups; i++ {
provs[perm[i]].Exchange.HasBlock(blk)
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ require (
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.1.1
github.com/libp2p/go-libp2p-core v0.0.3
github.com/libp2p/go-libp2p-kad-dht v0.1.0
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-netutil v0.1.0
github.com/libp2p/go-libp2p-record v0.1.0
github.com/libp2p/go-libp2p-testing v0.0.4
github.com/libp2p/go-msgio v0.0.4
github.com/mattn/go-colorable v0.1.2 // indirect
Expand Down
Loading