Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
More stats, knobs and tunings (#514)
Browse files Browse the repository at this point in the history
* add configurability options for TaskWorkerCount and EngineTaskWorkerCount,
* add option for maximum outstanding bytes per peer
* add prometheus metrics for how long it takes to send messages, the number of pending and active tasks, and the number of pending and active block tasks
* add many of the unexported defaults to a defaults subpackage of the internal package

* feat: tighter send timeouts

1. Minimum timeout of 10s.
2. We add 2s due to latencies.
3. Minimum bandwidth of 100kbit/s.
4. Maximum message send time of 2min (way more time than necessary).

Co-authored-by: Adin Schmahmann <adin.schmahmann@gmail.com>
Co-authored-by: Steven Allen <steven@stebalien.com>
  • Loading branch information
3 people authored Aug 18, 2021
1 parent 5c2c537 commit 2b51297
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 86 deletions.
128 changes: 95 additions & 33 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
deciface "github.com/ipfs/go-bitswap/decision"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
"github.com/ipfs/go-bitswap/internal/decision"
"github.com/ipfs/go-bitswap/internal/defaults"
bsgetter "github.com/ipfs/go-bitswap/internal/getter"
bsmq "github.com/ipfs/go-bitswap/internal/messagequeue"
"github.com/ipfs/go-bitswap/internal/notifications"
Expand Down Expand Up @@ -42,15 +43,6 @@ var sflog = log.Desugar()

var _ exchange.SessionExchange = (*Bitswap)(nil)

const (
// these requests take at _least_ two minutes at the moment.
provideTimeout = time.Minute * 3
defaultProvSearchDelay = time.Second

// Number of concurrent workers in decision engine that process requests to the blockstore
defaulEngineBlockstoreWorkerCount = 128
)

var (
// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
Expand All @@ -62,6 +54,8 @@ var (

// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}

timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600}
)

// Option defines the functional option type that can be used to configure
Expand Down Expand Up @@ -100,6 +94,36 @@ func EngineBlockstoreWorkerCount(count int) Option {
}
}

// EngineTaskWorkerCount sets the number of worker threads used inside the engine
func EngineTaskWorkerCount(count int) Option {
if count <= 0 {
panic(fmt.Sprintf("Engine task worker count is %d but must be > 0", count))
}
return func(bs *Bitswap) {
bs.engineTaskWorkerCount = count
}
}

func TaskWorkerCount(count int) Option {
if count <= 0 {
panic(fmt.Sprintf("task worker count is %d but must be > 0", count))
}
return func(bs *Bitswap) {
bs.taskWorkerCount = count
}
}

// MaxOutstandingBytesPerPeer describes approximately how much work we are will to have outstanding to a peer at any
// given time. Setting it to 0 will disable any limiting.
func MaxOutstandingBytesPerPeer(count int) Option {
if count < 0 {
panic(fmt.Sprintf("max outstanding bytes per peer is %d but must be >= 0", count))
}
return func(bs *Bitswap) {
bs.engineMaxOutstandingBytesPerPeer = count
}
}

// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
Expand Down Expand Up @@ -147,6 +171,17 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
" this bitswap").Histogram(metricsBuckets)

sendTimeHistogram := metrics.NewCtx(ctx, "send_times", "Histogram of how long it takes to send messages"+
" in this bitswap").Histogram(timeMetricsBuckets)

pendingEngineGauge := metrics.NewCtx(ctx, "pending_tasks", "Total number of pending tasks").Gauge()

activeEngineGauge := metrics.NewCtx(ctx, "active_tasks", "Total number of active tasks").Gauge()

pendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge()

activeBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge()

px := process.WithTeardown(func() error {
return nil
})
Expand Down Expand Up @@ -192,26 +227,30 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())

bs = &Bitswap{
blockstore: bstore,
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
pm: pm,
pqm: pqm,
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
engineBstoreWorkerCount: defaulEngineBlockstoreWorkerCount,
engineSetSendDontHaves: true,
simulateDontHavesOnTimeout: true,
blockstore: bstore,
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
pm: pm,
pqm: pqm,
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
sendTimeHistogram: sendTimeHistogram,
provideEnabled: true,
provSearchDelay: defaults.ProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
engineBstoreWorkerCount: defaults.BitswapEngineBlockstoreWorkerCount,
engineTaskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
taskWorkerCount: defaults.BitswapTaskWorkerCount,
engineMaxOutstandingBytesPerPeer: defaults.BitswapMaxOutstandingBytesPerPeer,
engineSetSendDontHaves: true,
simulateDontHavesOnTimeout: true,
}

// apply functional options before starting and running bitswap
Expand All @@ -220,7 +259,20 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}

// Set up decision engine
bs.engine = decision.NewEngine(bstore, bs.engineBstoreWorkerCount, network.ConnectionManager(), network.Self(), bs.engineScoreLedger)
bs.engine = decision.NewEngine(
ctx,
bstore,
bs.engineBstoreWorkerCount,
bs.engineTaskWorkerCount,
bs.engineMaxOutstandingBytesPerPeer,
network.ConnectionManager(),
network.Self(),
bs.engineScoreLedger,
pendingEngineGauge,
activeEngineGauge,
pendingBlocksGauge,
activeBlocksGauge,
)
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)

bs.pqm.Startup()
Expand Down Expand Up @@ -277,9 +329,10 @@ type Bitswap struct {
counters *counters

// Metrics interface metrics
dupMetric metrics.Histogram
allMetric metrics.Histogram
sentHistogram metrics.Histogram
dupMetric metrics.Histogram
allMetric metrics.Histogram
sentHistogram metrics.Histogram
sendTimeHistogram metrics.Histogram

// External statistics interface
wiretap WireTap
Expand All @@ -303,6 +356,15 @@ type Bitswap struct {
// how many worker threads to start for decision engine blockstore worker
engineBstoreWorkerCount int

// how many worker threads to start for decision engine task worker
engineTaskWorkerCount int

// the total number of simultaneous threads sending outgoing messages
taskWorkerCount int

// the total amount of bytes that a peer should have outstanding, it is utilized by the decision engine
engineMaxOutstandingBytesPerPeer int

// the score ledger used by the decision engine
engineScoreLedger deciface.ScoreLedger

Expand Down
6 changes: 5 additions & 1 deletion bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,11 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.SkipNow()
}
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{
bitswap.TaskWorkerCount(5),
bitswap.EngineTaskWorkerCount(5),
bitswap.MaxOutstandingBytesPerPeer(1 << 20),
})
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-log v1.0.5
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-peertaskqueue v0.2.0
github.com/ipfs/go-peertaskqueue v0.4.0
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.14.3
Expand All @@ -28,6 +28,7 @@ require (
github.com/libp2p/go-msgio v0.0.6
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multistream v0.2.2
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.16.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.2.0 h1:2cSr7exUGKYyDeUyQ7P/nHPs9P7Ht/B+ROrpN1EJOjc=
github.com/ipfs/go-peertaskqueue v0.2.0/go.mod h1:5/eNrBEbtSKWCG+kQK8K8fGNixoYUnr+P7jivavs9lY=
github.com/ipfs/go-peertaskqueue v0.4.0 h1:x1hFgA4JOUJ3ntPfqLRu6v4k6kKL0p07r3RSg9JNyHI=
github.com/ipfs/go-peertaskqueue v0.4.0/go.mod h1:KL9F49hXJMoXCad8e5anivjN+kWdr+CyGcyh4K6doLc=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down
33 changes: 24 additions & 9 deletions internal/decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,36 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
)

// blockstoreManager maintains a pool of workers that make requests to the blockstore.
type blockstoreManager struct {
bs bstore.Blockstore
workerCount int
jobs chan func()
px process.Process
bs bstore.Blockstore
workerCount int
jobs chan func()
px process.Process
pendingGauge metrics.Gauge
activeGauge metrics.Gauge
}

// newBlockstoreManager creates a new blockstoreManager with the given context
// and number of workers
func newBlockstoreManager(bs bstore.Blockstore, workerCount int) *blockstoreManager {
func newBlockstoreManager(
ctx context.Context,
bs bstore.Blockstore,
workerCount int,
pendingGauge metrics.Gauge,
activeGauge metrics.Gauge,
) *blockstoreManager {
return &blockstoreManager{
bs: bs,
workerCount: workerCount,
jobs: make(chan func()),
px: process.WithTeardown(func() error { return nil }),
bs: bs,
workerCount: workerCount,
jobs: make(chan func()),
px: process.WithTeardown(func() error { return nil }),
pendingGauge: pendingGauge,
activeGauge: activeGauge,
}
}

Expand All @@ -46,7 +57,10 @@ func (bsm *blockstoreManager) worker(px process.Process) {
case <-px.Closing():
return
case job := <-bsm.jobs:
bsm.pendingGauge.Dec()
bsm.activeGauge.Inc()
job()
bsm.activeGauge.Dec()
}
}
}
Expand All @@ -58,6 +72,7 @@ func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error {
case <-bsm.px.Closing():
return fmt.Errorf("shutting down")
case bsm.jobs <- job:
bsm.pendingGauge.Inc()
return nil
}
}
Expand Down
22 changes: 17 additions & 5 deletions internal/decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ipfs/go-bitswap/internal/testutil"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-metrics-interface"

blocks "github.com/ipfs/go-block-format"
ds "github.com/ipfs/go-datastore"
Expand All @@ -19,13 +20,23 @@ import (
process "github.com/jbenet/goprocess"
)

func newBlockstoreManagerForTesting(
ctx context.Context,
bs blockstore.Blockstore,
workerCount int,
) *blockstoreManager {
testPendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge()
testActiveBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge()
return newBlockstoreManager(ctx, bs, workerCount, testPendingBlocksGauge, testActiveBlocksGauge)
}

func TestBlockstoreManagerNotFoundKey(t *testing.T) {
ctx := context.Background()
bsdelay := delay.Fixed(3 * time.Millisecond)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(bstore, 5)
bsm := newBlockstoreManagerForTesting(ctx, bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))

cids := testutil.GenerateCids(4)
Expand Down Expand Up @@ -64,7 +75,7 @@ func TestBlockstoreManager(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(bstore, 5)
bsm := newBlockstoreManagerForTesting(ctx, bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))

exp := make(map[cid.Cid]blocks.Block)
Expand Down Expand Up @@ -148,7 +159,7 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

workerCount := 5
bsm := newBlockstoreManager(bstore, workerCount)
bsm := newBlockstoreManagerForTesting(ctx, bstore, workerCount)
bsm.start(process.WithTeardown(func() error { return nil }))

blkSize := int64(8 * 1024)
Expand Down Expand Up @@ -190,7 +201,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(bstore, 3)
bsm := newBlockstoreManagerForTesting(ctx, bstore, 3)
px := process.WithTeardown(func() error { return nil })
bsm.start(px)

Expand Down Expand Up @@ -229,7 +240,8 @@ func TestBlockstoreManagerCtxDone(t *testing.T) {
underlyingBstore := blockstore.NewBlockstore(underlyingDstore)
bstore := blockstore.NewBlockstore(dstore)

bsm := newBlockstoreManager(bstore, 3)
ctx := context.Background()
bsm := newBlockstoreManagerForTesting(ctx, bstore, 3)
proc := process.WithTeardown(func() error { return nil })
bsm.start(proc)

Expand Down
Loading

0 comments on commit 2b51297

Please sign in to comment.