diff --git a/bitswap.go b/bitswap.go index d7574118..03694302 100644 --- a/bitswap.go +++ b/bitswap.go @@ -15,6 +15,7 @@ import ( deciface "github.com/ipfs/go-bitswap/decision" bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" "github.com/ipfs/go-bitswap/internal/decision" + "github.com/ipfs/go-bitswap/internal/defaults" bsgetter "github.com/ipfs/go-bitswap/internal/getter" bsmq "github.com/ipfs/go-bitswap/internal/messagequeue" "github.com/ipfs/go-bitswap/internal/notifications" @@ -42,15 +43,6 @@ var sflog = log.Desugar() var _ exchange.SessionExchange = (*Bitswap)(nil) -const ( - // these requests take at _least_ two minutes at the moment. - provideTimeout = time.Minute * 3 - defaultProvSearchDelay = time.Second - - // Number of concurrent workers in decision engine that process requests to the blockstore - defaulEngineBlockstoreWorkerCount = 128 -) - var ( // HasBlockBufferSize is the buffer size of the channel for new blocks // that need to be provided. They should get pulled over by the @@ -62,6 +54,8 @@ var ( // the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} + + timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600} ) // Option defines the functional option type that can be used to configure @@ -100,6 +94,36 @@ func EngineBlockstoreWorkerCount(count int) Option { } } +// EngineTaskWorkerCount sets the number of worker threads used inside the engine +func EngineTaskWorkerCount(count int) Option { + if count <= 0 { + panic(fmt.Sprintf("Engine task worker count is %d but must be > 0", count)) + } + return func(bs *Bitswap) { + bs.engineTaskWorkerCount = count + } +} + +func TaskWorkerCount(count int) Option { + if count <= 0 { + panic(fmt.Sprintf("task worker count is %d but must be > 0", count)) + } + return func(bs *Bitswap) { + bs.taskWorkerCount = count + } +} + +// MaxOutstandingBytesPerPeer describes approximately how much work we are will to have outstanding to a peer at any +// given time. Setting it to 0 will disable any limiting. +func MaxOutstandingBytesPerPeer(count int) Option { + if count < 0 { + panic(fmt.Sprintf("max outstanding bytes per peer is %d but must be >= 0", count)) + } + return func(bs *Bitswap) { + bs.engineMaxOutstandingBytesPerPeer = count + } +} + // SetSendDontHaves indicates what to do when the engine receives a want-block // for a block that is not in the blockstore. Either // - Send a DONT_HAVE message @@ -147,6 +171,17 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+ " this bitswap").Histogram(metricsBuckets) + sendTimeHistogram := metrics.NewCtx(ctx, "send_times", "Histogram of how long it takes to send messages"+ + " in this bitswap").Histogram(timeMetricsBuckets) + + pendingEngineGauge := metrics.NewCtx(ctx, "pending_tasks", "Total number of pending tasks").Gauge() + + activeEngineGauge := metrics.NewCtx(ctx, "active_tasks", "Total number of active tasks").Gauge() + + pendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge() + + activeBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge() + px := process.WithTeardown(func() error { return nil }) @@ -192,26 +227,30 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self()) bs = &Bitswap{ - blockstore: bstore, - network: network, - process: px, - newBlocks: make(chan cid.Cid, HasBlockBufferSize), - provideKeys: make(chan cid.Cid, provideKeysBufferSize), - pm: pm, - pqm: pqm, - sm: sm, - sim: sim, - notif: notif, - counters: new(counters), - dupMetric: dupHist, - allMetric: allHist, - sentHistogram: sentHistogram, - provideEnabled: true, - provSearchDelay: defaultProvSearchDelay, - rebroadcastDelay: delay.Fixed(time.Minute), - engineBstoreWorkerCount: defaulEngineBlockstoreWorkerCount, - engineSetSendDontHaves: true, - simulateDontHavesOnTimeout: true, + blockstore: bstore, + network: network, + process: px, + newBlocks: make(chan cid.Cid, HasBlockBufferSize), + provideKeys: make(chan cid.Cid, provideKeysBufferSize), + pm: pm, + pqm: pqm, + sm: sm, + sim: sim, + notif: notif, + counters: new(counters), + dupMetric: dupHist, + allMetric: allHist, + sentHistogram: sentHistogram, + sendTimeHistogram: sendTimeHistogram, + provideEnabled: true, + provSearchDelay: defaults.ProvSearchDelay, + rebroadcastDelay: delay.Fixed(time.Minute), + engineBstoreWorkerCount: defaults.BitswapEngineBlockstoreWorkerCount, + engineTaskWorkerCount: defaults.BitswapEngineTaskWorkerCount, + taskWorkerCount: defaults.BitswapTaskWorkerCount, + engineMaxOutstandingBytesPerPeer: defaults.BitswapMaxOutstandingBytesPerPeer, + engineSetSendDontHaves: true, + simulateDontHavesOnTimeout: true, } // apply functional options before starting and running bitswap @@ -220,7 +259,20 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, } // Set up decision engine - bs.engine = decision.NewEngine(bstore, bs.engineBstoreWorkerCount, network.ConnectionManager(), network.Self(), bs.engineScoreLedger) + bs.engine = decision.NewEngine( + ctx, + bstore, + bs.engineBstoreWorkerCount, + bs.engineTaskWorkerCount, + bs.engineMaxOutstandingBytesPerPeer, + network.ConnectionManager(), + network.Self(), + bs.engineScoreLedger, + pendingEngineGauge, + activeEngineGauge, + pendingBlocksGauge, + activeBlocksGauge, + ) bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves) bs.pqm.Startup() @@ -277,9 +329,10 @@ type Bitswap struct { counters *counters // Metrics interface metrics - dupMetric metrics.Histogram - allMetric metrics.Histogram - sentHistogram metrics.Histogram + dupMetric metrics.Histogram + allMetric metrics.Histogram + sentHistogram metrics.Histogram + sendTimeHistogram metrics.Histogram // External statistics interface wiretap WireTap @@ -303,6 +356,15 @@ type Bitswap struct { // how many worker threads to start for decision engine blockstore worker engineBstoreWorkerCount int + // how many worker threads to start for decision engine task worker + engineTaskWorkerCount int + + // the total number of simultaneous threads sending outgoing messages + taskWorkerCount int + + // the total amount of bytes that a peer should have outstanding, it is utilized by the decision engine + engineMaxOutstandingBytesPerPeer int + // the score ledger used by the decision engine engineScoreLedger deciface.ScoreLedger diff --git a/bitswap_test.go b/bitswap_test.go index f28112d7..0da62dd3 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -285,7 +285,11 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.SkipNow() } net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{ + bitswap.TaskWorkerCount(5), + bitswap.EngineTaskWorkerCount(5), + bitswap.MaxOutstandingBytesPerPeer(1 << 20), + }) defer ig.Close() bg := blocksutil.NewBlockGenerator() diff --git a/go.mod b/go.mod index dde30204..91467c66 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/ipfs/go-ipfs-util v0.0.2 github.com/ipfs/go-log v1.0.5 github.com/ipfs/go-metrics-interface v0.0.1 - github.com/ipfs/go-peertaskqueue v0.2.0 + github.com/ipfs/go-peertaskqueue v0.4.0 github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-libp2p v0.14.3 @@ -28,6 +28,7 @@ require ( github.com/libp2p/go-msgio v0.0.6 github.com/multiformats/go-multiaddr v0.3.3 github.com/multiformats/go-multistream v0.2.2 + github.com/stretchr/testify v1.7.0 go.uber.org/zap v1.16.0 ) diff --git a/go.sum b/go.sum index 3b2c7979..d186ebf0 100644 --- a/go.sum +++ b/go.sum @@ -306,8 +306,8 @@ github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk= github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= -github.com/ipfs/go-peertaskqueue v0.2.0 h1:2cSr7exUGKYyDeUyQ7P/nHPs9P7Ht/B+ROrpN1EJOjc= -github.com/ipfs/go-peertaskqueue v0.2.0/go.mod h1:5/eNrBEbtSKWCG+kQK8K8fGNixoYUnr+P7jivavs9lY= +github.com/ipfs/go-peertaskqueue v0.4.0 h1:x1hFgA4JOUJ3ntPfqLRu6v4k6kKL0p07r3RSg9JNyHI= +github.com/ipfs/go-peertaskqueue v0.4.0/go.mod h1:KL9F49hXJMoXCad8e5anivjN+kWdr+CyGcyh4K6doLc= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= diff --git a/internal/decision/blockstoremanager.go b/internal/decision/blockstoremanager.go index dc022caf..7d6864eb 100644 --- a/internal/decision/blockstoremanager.go +++ b/internal/decision/blockstoremanager.go @@ -8,25 +8,36 @@ import ( blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" bstore "github.com/ipfs/go-ipfs-blockstore" + "github.com/ipfs/go-metrics-interface" process "github.com/jbenet/goprocess" ) // blockstoreManager maintains a pool of workers that make requests to the blockstore. type blockstoreManager struct { - bs bstore.Blockstore - workerCount int - jobs chan func() - px process.Process + bs bstore.Blockstore + workerCount int + jobs chan func() + px process.Process + pendingGauge metrics.Gauge + activeGauge metrics.Gauge } // newBlockstoreManager creates a new blockstoreManager with the given context // and number of workers -func newBlockstoreManager(bs bstore.Blockstore, workerCount int) *blockstoreManager { +func newBlockstoreManager( + ctx context.Context, + bs bstore.Blockstore, + workerCount int, + pendingGauge metrics.Gauge, + activeGauge metrics.Gauge, +) *blockstoreManager { return &blockstoreManager{ - bs: bs, - workerCount: workerCount, - jobs: make(chan func()), - px: process.WithTeardown(func() error { return nil }), + bs: bs, + workerCount: workerCount, + jobs: make(chan func()), + px: process.WithTeardown(func() error { return nil }), + pendingGauge: pendingGauge, + activeGauge: activeGauge, } } @@ -46,7 +57,10 @@ func (bsm *blockstoreManager) worker(px process.Process) { case <-px.Closing(): return case job := <-bsm.jobs: + bsm.pendingGauge.Dec() + bsm.activeGauge.Inc() job() + bsm.activeGauge.Dec() } } } @@ -58,6 +72,7 @@ func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error { case <-bsm.px.Closing(): return fmt.Errorf("shutting down") case bsm.jobs <- job: + bsm.pendingGauge.Inc() return nil } } diff --git a/internal/decision/blockstoremanager_test.go b/internal/decision/blockstoremanager_test.go index e8d6bb01..ad447738 100644 --- a/internal/decision/blockstoremanager_test.go +++ b/internal/decision/blockstoremanager_test.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-bitswap/internal/testutil" cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-metrics-interface" blocks "github.com/ipfs/go-block-format" ds "github.com/ipfs/go-datastore" @@ -19,13 +20,23 @@ import ( process "github.com/jbenet/goprocess" ) +func newBlockstoreManagerForTesting( + ctx context.Context, + bs blockstore.Blockstore, + workerCount int, +) *blockstoreManager { + testPendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge() + testActiveBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge() + return newBlockstoreManager(ctx, bs, workerCount, testPendingBlocksGauge, testActiveBlocksGauge) +} + func TestBlockstoreManagerNotFoundKey(t *testing.T) { ctx := context.Background() bsdelay := delay.Fixed(3 * time.Millisecond) dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)) - bsm := newBlockstoreManager(bstore, 5) + bsm := newBlockstoreManagerForTesting(ctx, bstore, 5) bsm.start(process.WithTeardown(func() error { return nil })) cids := testutil.GenerateCids(4) @@ -64,7 +75,7 @@ func TestBlockstoreManager(t *testing.T) { dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)) - bsm := newBlockstoreManager(bstore, 5) + bsm := newBlockstoreManagerForTesting(ctx, bstore, 5) bsm.start(process.WithTeardown(func() error { return nil })) exp := make(map[cid.Cid]blocks.Block) @@ -148,7 +159,7 @@ func TestBlockstoreManagerConcurrency(t *testing.T) { bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)) workerCount := 5 - bsm := newBlockstoreManager(bstore, workerCount) + bsm := newBlockstoreManagerForTesting(ctx, bstore, workerCount) bsm.start(process.WithTeardown(func() error { return nil })) blkSize := int64(8 * 1024) @@ -190,7 +201,7 @@ func TestBlockstoreManagerClose(t *testing.T) { dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)) - bsm := newBlockstoreManager(bstore, 3) + bsm := newBlockstoreManagerForTesting(ctx, bstore, 3) px := process.WithTeardown(func() error { return nil }) bsm.start(px) @@ -229,7 +240,8 @@ func TestBlockstoreManagerCtxDone(t *testing.T) { underlyingBstore := blockstore.NewBlockstore(underlyingDstore) bstore := blockstore.NewBlockstore(dstore) - bsm := newBlockstoreManager(bstore, 3) + ctx := context.Background() + bsm := newBlockstoreManagerForTesting(ctx, bstore, 3) proc := process.WithTeardown(func() error { return nil }) bsm.start(proc) diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 31c50e3f..76519bd3 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -16,6 +16,7 @@ import ( "github.com/ipfs/go-cid" bstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log" + "github.com/ipfs/go-metrics-interface" "github.com/ipfs/go-peertaskqueue" "github.com/ipfs/go-peertaskqueue/peertask" process "github.com/jbenet/goprocess" @@ -73,9 +74,6 @@ const ( // maxBlockSizeReplaceHasWithBlock is the maximum size of the block in // bytes up to which we will replace a want-have with a want-block maxBlockSizeReplaceHasWithBlock = 1024 - - // Number of concurrent workers that pull tasks off the request queue - taskWorkerCount = 8 ) // Envelope contains a message for a Peer. @@ -167,16 +165,65 @@ type Engine struct { sendDontHaves bool self peer.ID + + // metrics gauge for total pending tasks across all workers + pendingGauge metrics.Gauge + + // metrics gauge for total pending tasks across all workers + activeGauge metrics.Gauge + + // used to ensure metrics are reported each fixed number of operation + metricsLock sync.Mutex + metricUpdateCounter int } -// NewEngine creates a new block sending engine for the given block store -func NewEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID, scoreLedger ScoreLedger) *Engine { - return newEngine(bs, bstoreWorkerCount, peerTagger, self, maxBlockSizeReplaceHasWithBlock, scoreLedger) +// NewEngine creates a new block sending engine for the given block store. +// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum +// work already outstanding. +func NewEngine( + ctx context.Context, + bs bstore.Blockstore, + bstoreWorkerCount, + engineTaskWorkerCount, maxOutstandingBytesPerPeer int, + peerTagger PeerTagger, + self peer.ID, + scoreLedger ScoreLedger, + pendingEngineGauge metrics.Gauge, + activeEngineGauge metrics.Gauge, + pendingBlocksGauge metrics.Gauge, + activeBlocksGauge metrics.Gauge, +) *Engine { + return newEngine( + ctx, + bs, + bstoreWorkerCount, + engineTaskWorkerCount, + maxOutstandingBytesPerPeer, + peerTagger, + self, + maxBlockSizeReplaceHasWithBlock, + scoreLedger, + pendingEngineGauge, + activeEngineGauge, + pendingBlocksGauge, + activeBlocksGauge, + ) } -// This constructor is used by the tests -func newEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID, - maxReplaceSize int, scoreLedger ScoreLedger) *Engine { +func newEngine( + ctx context.Context, + bs bstore.Blockstore, + bstoreWorkerCount, + engineTaskWorkerCount, maxOutstandingBytesPerPeer int, + peerTagger PeerTagger, + self peer.ID, + maxReplaceSize int, + scoreLedger ScoreLedger, + pendingEngineGauge metrics.Gauge, + activeEngineGauge metrics.Gauge, + pendingBlocksGauge metrics.Gauge, + activeBlocksGauge metrics.Gauge, +) *Engine { if scoreLedger == nil { scoreLedger = NewDefaultScoreLedger() @@ -185,16 +232,18 @@ func newEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagge e := &Engine{ ledgerMap: make(map[peer.ID]*ledger), scoreLedger: scoreLedger, - bsm: newBlockstoreManager(bs, bstoreWorkerCount), + bsm: newBlockstoreManager(ctx, bs, bstoreWorkerCount, pendingBlocksGauge, activeBlocksGauge), peerTagger: peerTagger, outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}, 1), ticker: time.NewTicker(time.Millisecond * 100), maxBlockSizeReplaceHasWithBlock: maxReplaceSize, - taskWorkerCount: taskWorkerCount, + taskWorkerCount: engineTaskWorkerCount, sendDontHaves: true, self: self, peerLedger: newPeerLedger(), + pendingGauge: pendingEngineGauge, + activeGauge: activeEngineGauge, } e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String()) e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String()) @@ -202,10 +251,24 @@ func newEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagge peertaskqueue.OnPeerAddedHook(e.onPeerAdded), peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved), peertaskqueue.TaskMerger(newTaskMerger()), - peertaskqueue.IgnoreFreezing(true)) + peertaskqueue.IgnoreFreezing(true), + peertaskqueue.MaxOutstandingWorkPerPeer(maxOutstandingBytesPerPeer)) return e } +func (e *Engine) updateMetrics() { + e.metricsLock.Lock() + c := e.metricUpdateCounter + e.metricUpdateCounter++ + e.metricsLock.Unlock() + + if c%100 == 0 { + stats := e.peerRequestQueue.Stats() + e.activeGauge.Set(float64(stats.NumActive)) + e.pendingGauge.Set(float64(stats.NumPending)) + } +} + // SetSendDontHaves indicates what to do when the engine receives a want-block // for a block that is not in the blockstore. Either // - Send a DONT_HAVE message @@ -316,18 +379,21 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { for { // Pop some tasks off the request queue p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(targetMessageSize) + e.updateMetrics() for len(nextTasks) == 0 { select { case <-ctx.Done(): return nil, ctx.Err() case <-e.workSignal: p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize) + e.updateMetrics() case <-e.ticker.C: // When a task is cancelled, the queue may be "frozen" for a // period of time. We periodically "thaw" the queue to make // sure it doesn't get stuck in a frozen state. e.peerRequestQueue.ThawRound() p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize) + e.updateMetrics() } } @@ -557,6 +623,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // Push entries onto the request queue if len(activeEntries) > 0 { e.peerRequestQueue.PushTasks(p, activeEntries...) + e.updateMetrics() } } @@ -646,6 +713,7 @@ func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block) { SendDontHave: false, }, }) + e.updateMetrics() } } e.lock.RUnlock() diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index d8c83678..d8445fde 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -11,9 +11,11 @@ import ( "time" "github.com/benbjohnson/clock" + "github.com/ipfs/go-bitswap/internal/defaults" "github.com/ipfs/go-bitswap/internal/testutil" message "github.com/ipfs/go-bitswap/message" pb "github.com/ipfs/go-bitswap/message/pb" + "github.com/ipfs/go-metrics-interface" blocks "github.com/ipfs/go-block-format" ds "github.com/ipfs/go-datastore" @@ -97,7 +99,7 @@ func newTestEngine(ctx context.Context, idStr string) engineSet { func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock) engineSet { fpt := &fakePeerTagger{} bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - e := newEngine(bs, 4, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh, clock)) + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh, clock)) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) return engineSet{ Peer: peer.ID(idStr), @@ -182,10 +184,42 @@ func peerIsPartner(p peer.ID, e *Engine) bool { return false } +func newEngineForTesting( + ctx context.Context, + bs blockstore.Blockstore, + bstoreWorkerCount, + engineTaskWorkerCount, maxOutstandingBytesPerPeer int, + peerTagger PeerTagger, + self peer.ID, + maxReplaceSize int, + scoreLedger ScoreLedger, +) *Engine { + testPendingEngineGauge := metrics.NewCtx(ctx, "pending_tasks", "Total number of pending tasks").Gauge() + testActiveEngineGauge := metrics.NewCtx(ctx, "active_tasks", "Total number of active tasks").Gauge() + testPendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge() + testActiveBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge() + return newEngine( + ctx, + bs, + bstoreWorkerCount, + engineTaskWorkerCount, + maxOutstandingBytesPerPeer, + peerTagger, + self, + maxReplaceSize, + scoreLedger, + testPendingEngineGauge, + testActiveEngineGauge, + testPendingBlocksGauge, + testActiveBlocksGauge, + ) +} + func TestOutboxClosedWhenEngineClosed(t *testing.T) { t.SkipNow() // TODO implement *Engine.Close - e := newEngine(blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) - e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) + ctx := context.Background() + e := newEngineForTesting(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) var wg sync.WaitGroup wg.Add(1) go func() { @@ -512,8 +546,9 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) { testCases = onlyTestCases } - e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) - e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) + ctx := context.Background() + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) for i, testCase := range testCases { t.Logf("Test case %d:", i) for _, wl := range testCase.wls { @@ -668,8 +703,9 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) { testCases = onlyTestCases } - e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) - e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) + ctx := context.Background() + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) var next envChan for i, testCase := range testCases { @@ -853,7 +889,7 @@ func TestPartnerWantsThenCancels(t *testing.T) { ctx := context.Background() for i := 0; i < numRounds; i++ { expected := make([][]string, 0, len(testcases)) - e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) for _, testcase := range testcases { set := testcase[0] @@ -878,8 +914,9 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) - e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) + ctx := context.Background() + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) blks := testutil.GenerateBlocksOfSize(4, 8*1024) msg := message.New(false) @@ -922,8 +959,9 @@ func TestSendDontHave(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) - e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) + ctx := context.Background() + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) blks := testutil.GenerateBlocksOfSize(4, 8*1024) msg := message.New(false) @@ -986,8 +1024,9 @@ func TestWantlistForPeer(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) - e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) + ctx := context.Background() + e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New())) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) blks := testutil.GenerateBlocksOfSize(4, 8*1024) msg := message.New(false) diff --git a/internal/defaults/defaults.go b/internal/defaults/defaults.go new file mode 100644 index 00000000..7237a996 --- /dev/null +++ b/internal/defaults/defaults.go @@ -0,0 +1,20 @@ +package defaults + +import ( + "time" +) + +const ( + // these requests take at _least_ two minutes at the moment. + ProvideTimeout = time.Minute * 3 + ProvSearchDelay = time.Second + + // Number of concurrent workers in decision engine that process requests to the blockstore + BitswapEngineBlockstoreWorkerCount = 128 + // the total number of simultaneous threads sending outgoing messages + BitswapTaskWorkerCount = 8 + // how many worker threads to start for decision engine task worker + BitswapEngineTaskWorkerCount = 8 + // the total amount of bytes that a peer should have outstanding, it is utilized by the decision engine + BitswapMaxOutstandingBytesPerPeer = 1 << 20 +) diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index b05ce558..7457aeb8 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -28,7 +28,11 @@ import ( var log = logging.Logger("bitswap_network") var connectTimeout = time.Second * 5 -var sendMessageTimeout = time.Minute * 10 + +var maxSendTimeout = 2 * time.Minute +var minSendTimeout = 10 * time.Second +var sendLatency = 2 * time.Second +var minSendRate = (100 * 1000) / 8 // 100kbit/s // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host. func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) BitSwapNetwork { @@ -300,7 +304,7 @@ func setDefaultOpts(opts *MessageSenderOpts) *MessageSenderOpts { copy.MaxRetries = 3 } if opts.SendTimeout == 0 { - copy.SendTimeout = sendMessageTimeout + copy.SendTimeout = maxSendTimeout } if opts.SendErrorBackoff == 0 { copy.SendErrorBackoff = 100 * time.Millisecond @@ -308,6 +312,17 @@ func setDefaultOpts(opts *MessageSenderOpts) *MessageSenderOpts { return © } +func sendTimeout(size int) time.Duration { + timeout := sendLatency + timeout += time.Duration((uint64(time.Second) * uint64(size)) / uint64(minSendRate)) + if timeout > maxSendTimeout { + timeout = maxSendTimeout + } else if timeout < minSendTimeout { + timeout = minSendTimeout + } + return timeout +} + func (bsnet *impl) SendMessage( ctx context.Context, p peer.ID, @@ -321,7 +336,8 @@ func (bsnet *impl) SendMessage( return err } - if err = bsnet.msgToStream(ctx, s, outgoing, sendMessageTimeout); err != nil { + timeout := sendTimeout(outgoing.Size()) + if err = bsnet.msgToStream(ctx, s, outgoing, timeout); err != nil { _ = s.Reset() return err } diff --git a/network/ipfs_impl_timeout_test.go b/network/ipfs_impl_timeout_test.go new file mode 100644 index 00000000..fdbe8e95 --- /dev/null +++ b/network/ipfs_impl_timeout_test.go @@ -0,0 +1,24 @@ +package network + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSendTimeout(t *testing.T) { + require.Equal(t, minSendTimeout, sendTimeout(0)) + require.Equal(t, maxSendTimeout, sendTimeout(1<<30)) + + // Check a 1MiB block (very large) + oneMiB := uint64(1 << 20) + hundredKbit := uint64(100 * 1000) + hundredKB := hundredKbit / 8 + expectedTime := sendLatency + time.Duration(oneMiB*uint64(time.Second)/hundredKB) + actualTime := sendTimeout(int(oneMiB)) + require.Equal(t, expectedTime, actualTime) + + // Check a 256KiB block (expected) + require.InDelta(t, 25*time.Second, sendTimeout(256<<10), float64(5*time.Second)) +} diff --git a/workers.go b/workers.go index 5db53423..c5b62d25 100644 --- a/workers.go +++ b/workers.go @@ -3,8 +3,10 @@ package bitswap import ( "context" "fmt" + "time" engine "github.com/ipfs/go-bitswap/internal/decision" + "github.com/ipfs/go-bitswap/internal/defaults" pb "github.com/ipfs/go-bitswap/message/pb" cid "github.com/ipfs/go-cid" process "github.com/jbenet/goprocess" @@ -12,14 +14,10 @@ import ( "go.uber.org/zap" ) -// TaskWorkerCount is the total number of simultaneous threads sending -// outgoing messages -var TaskWorkerCount = 8 - func (bs *Bitswap) startWorkers(ctx context.Context, px process.Process) { // Start up workers to handle requests from other nodes for the data on this node - for i := 0; i < TaskWorkerCount; i++ { + for i := 0; i < bs.taskWorkerCount; i++ { i := i px.Go(func(px process.Process) { bs.taskWorker(ctx, i) @@ -52,6 +50,8 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { continue } + start := time.Now() + // TODO: Only record message as sent if there was no error? // Ideally, yes. But we'd need some way to trigger a retry and/or drop // the peer. @@ -60,6 +60,10 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { bs.wiretap.MessageSent(envelope.Peer, envelope.Message) } bs.sendBlocks(ctx, envelope) + + dur := time.Since(start) + bs.sendTimeHistogram.Observe(dur.Seconds()) + case <-ctx.Done(): return } @@ -159,7 +163,7 @@ func (bs *Bitswap) provideWorker(px process.Process) { log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k) defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k) - ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx + ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx defer cancel() if err := bs.network.Provide(ctx, k); err != nil {