Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenlanders committed Jan 3, 2024
2 parents 02d1478 + 778330b commit 58b9424
Show file tree
Hide file tree
Showing 17 changed files with 164 additions and 341 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ jobs:
- name: Run Go Tests
run: |
NUM_SPLIT=20
make test-group-${{matrix.part}} NUM_SPLIT=20
make split-test-packages
make test-group-${{matrix.part}}
- name: Upload coverage artifact
uses: actions/upload-artifact@v3
Expand Down Expand Up @@ -99,7 +100,7 @@ jobs:
with:
max_attempts: 2
retry_on: error
timeout_seconds: 30
timeout_seconds: 60
command: |
jobs=$(curl https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/jobs)
job_statuses=$(echo "$jobs" | jq -r '.jobs[] | .conclusion')
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,6 @@ $(BUILDDIR)/packages.txt:$(GO_TEST_FILES) $(BUILDDIR)
go list -f "{{ if (or .TestGoFiles .XTestGoFiles) }}{{ .ImportPath }}{{ end }}" ./... | sort > $@

split-test-packages:$(BUILDDIR)/packages.txt
split -d -n l/$(NUM_SPLIT) $< $<.
split -d -l $(NUM_SPLIT) $< $<.
test-group-%:split-test-packages
cat $(BUILDDIR)/packages.txt.$* | xargs go test -mod=readonly -timeout=10m -race -covermode=atomic -coverprofile=$*.profile.out
cat $(BUILDDIR)/packages.txt.$* | xargs go test -mod=readonly -timeout=15m -race -covermode=atomic -coverprofile=$*.profile.out
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,9 +1383,9 @@ func DefaultSelfRemediationConfig() *SelfRemediationConfig {
P2pNoPeersRestarWindowSeconds: 0,
StatesyncNoPeersRestartWindowSeconds: 0,
BlocksBehindThreshold: 0,
BlocksBehindCheckIntervalSeconds: 30,
BlocksBehindCheckIntervalSeconds: 60,
// 30 minutes
RestartCooldownSeconds: 1800,
RestartCooldownSeconds: 600,
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/tendermint/tendermint

go 1.17
go 1.19

require (
github.com/BurntSushi/toml v1.1.0
Expand Down
225 changes: 0 additions & 225 deletions go.sum

Large diffs are not rendered by default.

148 changes: 95 additions & 53 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/tendermint/tendermint/internal/p2p"
"math"
"math/rand"
"sort"
Expand All @@ -13,6 +12,7 @@ import (
"time"

"github.com/tendermint/tendermint/internal/libs/flowrate"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/types"
Expand All @@ -31,7 +31,7 @@ eg, L = latency = 0.1s
*/

const (
requestInterval = 2 * time.Millisecond
requestInterval = 10 * time.Millisecond
inactiveSleepInterval = 1 * time.Second
maxTotalRequesters = 600
maxPeerErrBuffer = 1000
Expand All @@ -48,9 +48,13 @@ const (

// Maximum difference between current and new block's height.
maxDiffBetweenCurrentAndReceivedBlockHeight = 100

// Used to indicate the reason of the redo
PeerRemoved RetryReason = "PeerRemoved"
BadBlock RetryReason = "BadBlock"
)

var peerTimeout = 15 * time.Second // not const so we can override with tests
var peerTimeout = 10 * time.Second // not const so we can override with tests

/*
Peers self report their heights when we join the block pool.
Expand Down Expand Up @@ -187,7 +191,7 @@ func (pool *BlockPool) removeTimedoutPeers() {
}

if peer.didTimeout {
pool.removePeer(peer.id)
pool.removePeer(peer.id, true)
}
}
}
Expand All @@ -206,8 +210,8 @@ func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.RLock()
defer pool.mtx.RUnlock()

// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
// Need at least 2 peers to be considered caught up.
if len(pool.peers) <= 1 {
return false
}

Expand Down Expand Up @@ -277,8 +281,13 @@ func (pool *BlockPool) RedoRequest(height int64) types.NodeID {
request := pool.requesters[height]
peerID := request.getPeerID()
if peerID != types.NodeID("") {
// RemovePeer will redo all requesters associated with this peer.
pool.removePeer(peerID)
pool.removePeer(peerID, false)
}
// Redo all requesters associated with this peer.
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
requester.redo(peerID, BadBlock)
}
}
return peerID
}
Expand Down Expand Up @@ -376,13 +385,15 @@ func (pool *BlockPool) RemovePeer(peerID types.NodeID) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

pool.removePeer(peerID)
pool.removePeer(peerID, true)
}

func (pool *BlockPool) removePeer(peerID types.NodeID) {
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
requester.redo(peerID)
func (pool *BlockPool) removePeer(peerID types.NodeID, redo bool) {
if redo {
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
requester.redo(peerID, PeerRemoved)
}
}
}

Expand Down Expand Up @@ -437,22 +448,10 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
sortedPeers := pool.getSortedPeers(pool.peers)
var goodPeers []types.NodeID
// Remove peers with 0 score and shuffle list
for _, peer := range sortedPeers {
// We only want to work with peers that are ready & connected (not dialing)
if pool.peerManager.State(peer) == "ready,connected" {
goodPeers = append(goodPeers, peer)
}
if pool.peerManager.Score(peer) == 0 {
break
}
}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(goodPeers), func(i, j int) { goodPeers[i], goodPeers[j] = goodPeers[j], goodPeers[i] })

for _, nodeId := range sortedPeers {
peer := pool.peers[nodeId]
if peer.didTimeout {
pool.removePeer(peer.id)
pool.removePeer(peer.id, true)
continue
}
if peer.numPending >= maxPendingRequestsPerPeer {
Expand All @@ -461,6 +460,25 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
if height < peer.base || height > peer.height {
continue
}
// We only want to work with peers that are ready & connected (not dialing)
if pool.peerManager.State(nodeId) == "ready,connected" {
goodPeers = append(goodPeers, nodeId)
}

// Skip the ones with zero score to avoid connecting to bad peers
if pool.peerManager.Score(nodeId) <= 0 {
break
}
}

// randomly pick one
if len(goodPeers) > 0 {
rand.Seed(time.Now().UnixNano())
index := rand.Intn(len(goodPeers))
if index >= len(goodPeers) {
index = len(goodPeers) - 1
}
peer := pool.peers[goodPeers[index]]
peer.incrPending()
return peer
}
Expand Down Expand Up @@ -606,28 +624,35 @@ func (peer *bpPeer) onTimeout() {

type bpRequester struct {
service.BaseService
logger log.Logger
pool *BlockPool
height int64
gotBlockCh chan struct{}
redoCh chan types.NodeID // redo may send multitime, add peerId to identify repeat
logger log.Logger
pool *BlockPool
height int64
gotBlockCh chan struct{}
redoCh chan RedoOp // redo may send multitime, add peerId to identify repeat
timeoutTicker *time.Ticker
mtx sync.Mutex
peerID types.NodeID
block *types.Block
extCommit *types.ExtendedCommit
}

type RetryReason string

mtx sync.Mutex
peerID types.NodeID
block *types.Block
extCommit *types.ExtendedCommit
type RedoOp struct {
PeerId types.NodeID
Reason RetryReason
}

func newBPRequester(logger log.Logger, pool *BlockPool, height int64) *bpRequester {
bpr := &bpRequester{
logger: pool.logger,
pool: pool,
height: height,
gotBlockCh: make(chan struct{}, 1),
redoCh: make(chan types.NodeID, 1),

peerID: "",
block: nil,
logger: pool.logger,
pool: pool,
height: height,
gotBlockCh: make(chan struct{}, 1),
redoCh: make(chan RedoOp, 1),
timeoutTicker: time.NewTicker(peerTimeout),
peerID: "",
block: nil,
}
bpr.BaseService = *service.NewBaseService(logger, "bpRequester", bpr)
return bpr
Expand Down Expand Up @@ -679,25 +704,34 @@ func (bpr *bpRequester) getPeerID() types.NodeID {
}

// This is called from the requestRoutine, upon redo().
func (bpr *bpRequester) reset() {
func (bpr *bpRequester) reset(force bool) bool {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()

if bpr.block != nil && !force {
// Do not reset if we already have a block
return false
}

if bpr.block != nil {
atomic.AddInt32(&bpr.pool.numPending, 1)
}

bpr.peerID = ""
bpr.block = nil
bpr.extCommit = nil
return true
}

// Tells bpRequester to pick another peer and try again.
// NOTE: Nonblocking, and does nothing if another redo
// was already requested.
func (bpr *bpRequester) redo(peerID types.NodeID) {
func (bpr *bpRequester) redo(peerID types.NodeID, retryReason RetryReason) {
select {
case bpr.redoCh <- peerID:
case bpr.redoCh <- RedoOp{
PeerId: peerID,
Reason: retryReason,
}:
default:
}
}
Expand All @@ -711,7 +745,8 @@ OUTER_LOOP:
var peer *bpPeer
PICK_PEER_LOOP:
for {
if !bpr.IsRunning() || !bpr.pool.IsRunning() {
if !bpr.IsRunning() || !bpr.pool.IsRunning() || ctx.Err() != nil {
bpr.timeoutTicker.Stop()
return
}
if ctx.Err() != nil {
Expand All @@ -734,21 +769,28 @@ OUTER_LOOP:

// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
bpr.timeoutTicker.Reset(peerTimeout)
WAIT_LOOP:
for {
select {
case <-ctx.Done():
bpr.timeoutTicker.Stop()
return
case peerID := <-bpr.redoCh:
if peerID == bpr.peerID {
bpr.reset()
case redoOp := <-bpr.redoCh:
// if we don't have an existing block or this is a bad block
// we should reset the previous block
if bpr.reset(redoOp.Reason == BadBlock) {
continue OUTER_LOOP
}
continue WAIT_LOOP
case <-bpr.timeoutTicker.C:
if bpr.reset(false) {
continue OUTER_LOOP
} else {
continue WAIT_LOOP
}
case <-bpr.gotBlockCh:
// We got a block!
// Continue the for-loop and wait til Quit.
// Continue the for-loop and wait til Quit
// in case we need to reset the block
continue WAIT_LOOP
}
}
Expand Down
Loading

0 comments on commit 58b9424

Please sign in to comment.