Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

configurable target message size #546

Merged
merged 1 commit into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ func SetSimulateDontHavesOnTimeout(send bool) Option {
}
}

func WithTargetMessageSize(tms int) Option {
return func(bs *Bitswap) {
bs.engineTargetMessageSize = tms
}
}

type TaskInfo = decision.TaskInfo
type TaskComparator = decision.TaskComparator

Expand Down Expand Up @@ -259,6 +265,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
engineTaskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
taskWorkerCount: defaults.BitswapTaskWorkerCount,
engineMaxOutstandingBytesPerPeer: defaults.BitswapMaxOutstandingBytesPerPeer,
engineTargetMessageSize: defaults.BitswapEngineTargetMessageSize,
engineSetSendDontHaves: true,
simulateDontHavesOnTimeout: true,
}
Expand All @@ -283,6 +290,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
pendingBlocksGauge,
activeBlocksGauge,
decision.WithTaskComparator(bs.taskComparator),
decision.WithTargetMessageSize(bs.engineTargetMessageSize),
)
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)

Expand Down Expand Up @@ -379,6 +387,9 @@ type Bitswap struct {
// the score ledger used by the decision engine
engineScoreLedger deciface.ScoreLedger

// target message size setting for engines peer task queue
engineTargetMessageSize int

// indicates what to do when the engine receives a want-block for a block that
// is not in the blockstore. Either send DONT_HAVE or do nothing.
// This is used to simulate older versions of bitswap that did nothing instead of sending back a DONT_HAVE.
Expand Down
17 changes: 13 additions & 4 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
// targetMessageSize is the ideal size of the batched payload. We try to
// pop this much data off the request queue, but it may be a little more
// or less depending on what's in the queue.
targetMessageSize = 16 * 1024
defaultTargetMessageSize = 16 * 1024
// tagFormat is the tag given to peers associated an engine
tagFormat = "bs-engine-%s-%s"

Expand Down Expand Up @@ -159,6 +159,8 @@ type Engine struct {
taskWorkerLock sync.Mutex
taskWorkerCount int

targetMessageSize int

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int
Expand Down Expand Up @@ -207,6 +209,12 @@ func WithTaskComparator(comparator TaskComparator) Option {
}
}

func WithTargetMessageSize(size int) Option {
return func(e *Engine) {
e.targetMessageSize = size
}
}

// wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator
func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
return func(a, b *peertask.QueueTask) bool {
Expand Down Expand Up @@ -302,6 +310,7 @@ func newEngine(
peerLedger: newPeerLedger(),
pendingGauge: pendingEngineGauge,
activeGauge: activeEngineGauge,
targetMessageSize: defaultTargetMessageSize,
}
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
Expand Down Expand Up @@ -450,21 +459,21 @@ func (e *Engine) taskWorkerExit() {
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for {
// Pop some tasks off the request queue
p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(targetMessageSize)
p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(e.targetMessageSize)
e.updateMetrics()
for len(nextTasks) == 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.workSignal:
p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(e.targetMessageSize)
e.updateMetrics()
case <-e.ticker.C:
// When a task is cancelled, the queue may be "frozen" for a
// period of time. We periodically "thaw" the queue to make
// sure it doesn't get stuck in a frozen state.
e.peerRequestQueue.ThawRound()
p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(e.targetMessageSize)
e.updateMetrics()
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ const (
BitswapEngineTaskWorkerCount = 8
// the total amount of bytes that a peer should have outstanding, it is utilized by the decision engine
BitswapMaxOutstandingBytesPerPeer = 1 << 20
// the number of bytes we attempt to make each outgoing bitswap message
BitswapEngineTargetMessageSize = 16 * 1024
)