diff --git a/bitswap.go b/bitswap.go index fe0c4855..c7875307 100644 --- a/bitswap.go +++ b/bitswap.go @@ -148,6 +148,12 @@ func SetSimulateDontHavesOnTimeout(send bool) Option { } } +func WithTargetMessageSize(tms int) Option { + return func(bs *Bitswap) { + bs.engineTargetMessageSize = tms + } +} + type TaskInfo = decision.TaskInfo type TaskComparator = decision.TaskComparator @@ -259,6 +265,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, engineTaskWorkerCount: defaults.BitswapEngineTaskWorkerCount, taskWorkerCount: defaults.BitswapTaskWorkerCount, engineMaxOutstandingBytesPerPeer: defaults.BitswapMaxOutstandingBytesPerPeer, + engineTargetMessageSize: defaults.BitswapEngineTargetMessageSize, engineSetSendDontHaves: true, simulateDontHavesOnTimeout: true, } @@ -283,6 +290,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, pendingBlocksGauge, activeBlocksGauge, decision.WithTaskComparator(bs.taskComparator), + decision.WithTargetMessageSize(bs.engineTargetMessageSize), ) bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves) @@ -379,6 +387,9 @@ type Bitswap struct { // the score ledger used by the decision engine engineScoreLedger deciface.ScoreLedger + // target message size setting for engines peer task queue + engineTargetMessageSize int + // indicates what to do when the engine receives a want-block for a block that // is not in the blockstore. Either send DONT_HAVE or do nothing. // This is used to simulate older versions of bitswap that did nothing instead of sending back a DONT_HAVE. diff --git a/internal/decision/engine.go b/internal/decision/engine.go index abb0bcd6..24e45f16 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -64,7 +64,7 @@ const ( // targetMessageSize is the ideal size of the batched payload. We try to // pop this much data off the request queue, but it may be a little more // or less depending on what's in the queue. - targetMessageSize = 16 * 1024 + defaultTargetMessageSize = 16 * 1024 // tagFormat is the tag given to peers associated an engine tagFormat = "bs-engine-%s-%s" @@ -159,6 +159,8 @@ type Engine struct { taskWorkerLock sync.Mutex taskWorkerCount int + targetMessageSize int + // maxBlockSizeReplaceHasWithBlock is the maximum size of the block in // bytes up to which we will replace a want-have with a want-block maxBlockSizeReplaceHasWithBlock int @@ -207,6 +209,12 @@ func WithTaskComparator(comparator TaskComparator) Option { } } +func WithTargetMessageSize(size int) Option { + return func(e *Engine) { + e.targetMessageSize = size + } +} + // wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator { return func(a, b *peertask.QueueTask) bool { @@ -302,6 +310,7 @@ func newEngine( peerLedger: newPeerLedger(), pendingGauge: pendingEngineGauge, activeGauge: activeEngineGauge, + targetMessageSize: defaultTargetMessageSize, } e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String()) e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String()) @@ -450,21 +459,21 @@ func (e *Engine) taskWorkerExit() { func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { for { // Pop some tasks off the request queue - p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(targetMessageSize) + p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(e.targetMessageSize) e.updateMetrics() for len(nextTasks) == 0 { select { case <-ctx.Done(): return nil, ctx.Err() case <-e.workSignal: - p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize) + p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(e.targetMessageSize) e.updateMetrics() case <-e.ticker.C: // When a task is cancelled, the queue may be "frozen" for a // period of time. We periodically "thaw" the queue to make // sure it doesn't get stuck in a frozen state. e.peerRequestQueue.ThawRound() - p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize) + p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(e.targetMessageSize) e.updateMetrics() } } diff --git a/internal/defaults/defaults.go b/internal/defaults/defaults.go index 7237a996..54a9eaa6 100644 --- a/internal/defaults/defaults.go +++ b/internal/defaults/defaults.go @@ -17,4 +17,6 @@ const ( BitswapEngineTaskWorkerCount = 8 // the total amount of bytes that a peer should have outstanding, it is utilized by the decision engine BitswapMaxOutstandingBytesPerPeer = 1 << 20 + // the number of bytes we attempt to make each outgoing bitswap message + BitswapEngineTargetMessageSize = 16 * 1024 )