Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduler token bucket waiting #982

Merged
merged 3 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pkg/network/p2p/neighbor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p/core/protocol"
Expand All @@ -15,7 +16,8 @@ import (
)

const (
NeighborsSendQueueSize = 20_000
NeighborsSendQueueSize = 20_000
DroppedPacketDisconnectThreshold = 100
)

type queuedPacket struct {
Expand All @@ -31,7 +33,8 @@ type (

// neighbor describes the established p2p connection to another peer.
type neighbor struct {
peer *network.Peer
peer *network.Peer
droppedPacketCounter atomic.Uint32

logger log.Logger

Expand Down Expand Up @@ -84,7 +87,12 @@ func (n *neighbor) Peer() *network.Peer {
func (n *neighbor) Enqueue(packet proto.Message, protocolID protocol.ID) {
select {
case n.sendQueue <- &queuedPacket{protocolID: protocolID, packet: packet}:
n.droppedPacketCounter.Store(0)
default:
// Drop a neighbor that does not read from the full queue.
if n.droppedPacketCounter.Add(1) >= DroppedPacketDisconnectThreshold {
n.Close()
}
n.logger.LogWarn("Dropped packet due to SendQueue being full")
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/protocol/engine/blockdag/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blockdag
import (
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
iotago "github.com/iotaledger/iota.go/v4"
)

// Events is a collection of Tangle related Events.
Expand All @@ -19,6 +20,9 @@ type Events struct {
// MissingBlockAppended is triggered when a previously missing Block was appended.
MissingBlockAppended *event.Event1[*blocks.Block]

// BlockNotAppended is triggered when an incoming Block could not be successfully appended.
BlockNotAppended *event.Event1[iotago.BlockID]

// BlockInvalid is triggered when a Block is found to be invalid.
BlockInvalid *event.Event2[*blocks.Block, error]

Expand All @@ -32,6 +36,7 @@ var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) {
BlockSolid: event.New1[*blocks.Block](),
BlockMissing: event.New1[*blocks.Block](),
MissingBlockAppended: event.New1[*blocks.Block](),
BlockNotAppended: event.New1[iotago.BlockID](),
BlockInvalid: event.New2[*blocks.Block, error](),
}
})
1 change: 1 addition & 0 deletions pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func NewProvider(opts ...options.Option[BlockDAG]) module.Provider[*engine.Engin

e.Events.PreSolidFilter.BlockPreAllowed.Hook(func(block *model.Block) {
if _, _, err := b.Append(block); err != nil {
b.events.BlockNotAppended.Trigger(block.ID())
b.LogError("failed to append block", "blockID", block.ID(), "issuer", block.ProtocolBlock().Header.IssuerID, "err", err)
}
}, event.WithWorkerPool(wp))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (b *BasicBuffer) ringInsert(v interface{}) *ring.Ring {
func (b *BasicBuffer) waitTime(rate float64, block *blocks.Block) time.Duration {
tokensRequired := float64(block.WorkScore()) - (b.tokenBucket + rate*time.Since(b.lastScheduleTime).Seconds())

return lo.Max(0, time.Duration(tokensRequired/rate))
return lo.Max(0, time.Duration(tokensRequired/rate)*time.Second)
}

func (b *BasicBuffer) updateTokenBucket(rate float64, tokenBucketSize float64) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,11 @@ loop:
case blockToSchedule = <-s.basicBuffer.blockChan:
currentAPI := s.apiProvider.CommittedAPI()
rate := currentAPI.ProtocolParameters().CongestionControlParameters().SchedulerRate
if waitTime := s.basicBuffer.waitTime(float64(rate), blockToSchedule); waitTime > 0 {
for waitTime := s.basicBuffer.waitTime(float64(rate), blockToSchedule); waitTime > 0; {
timer := time.NewTimer(waitTime)
<-timer.C
}

s.basicBuffer.updateTokenBucket(float64(rate), float64(currentAPI.MaxBlockWork()))

s.scheduleBasicBlock(blockToSchedule)
Expand Down
12 changes: 10 additions & 2 deletions pkg/protocol/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,15 +585,23 @@ func (e *Engine) setupEvictionState() {

func (e *Engine) setupBlockRequester() {
e.Events.BlockRequester.LinkTo(e.BlockRequester.Events)

wp := e.Workers.CreatePool("BlockRequester", workerpool.WithWorkerCount(1)) // Using just 1 worker to avoid contention
// We need to hook to make sure that the request is created before the block arrives to avoid a race condition
// where we try to delete the request again before it is created. Thus, continuing to request forever.
e.Events.BlockDAG.BlockMissing.Hook(func(block *blocks.Block) {
e.BlockRequester.StartTicker(block.ID())
})

e.Events.BlockDAG.MissingBlockAppended.Hook(func(block *blocks.Block) {
e.BlockRequester.StopTicker(block.ID())
}, event.WithWorkerPool(e.Workers.CreatePool("BlockRequester", workerpool.WithWorkerCount(1)))) // Using just 1 worker to avoid contention
}, event.WithWorkerPool(wp))

// Remove the block from the ticker if it failed to be appended.
// It's executed for all blocks to avoid locking twice:
// once to check if the block has the ticker and then again to remove it.
e.Events.BlockDAG.BlockNotAppended.Hook(func(blockID iotago.BlockID) {
e.BlockRequester.StopTicker(blockID)
}, event.WithWorkerPool(wp))
}

func (e *Engine) setupPruning() {
Expand Down
Loading