Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: multiple server bugfixes #20079

Merged
merged 9 commits into from
Sep 17, 2019
Merged
3 changes: 0 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,9 +2151,6 @@ func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []com
//
// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
bc.chainmu.RLock()
defer bc.chainmu.RUnlock()

return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
}

Expand Down
7 changes: 5 additions & 2 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,11 @@ func (hc *HeaderChain) GetAncestor(hash common.Hash, number, ancestor uint64, ma
}
for ancestor != 0 {
if rawdb.ReadCanonicalHash(hc.chainDb, number) == hash {
number -= ancestor
return rawdb.ReadCanonicalHash(hc.chainDb, number), number
ancestorHash := rawdb.ReadCanonicalHash(hc.chainDb, number-ancestor)
if rawdb.ReadCanonicalHash(hc.chainDb, number) == hash {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see what's different about line 353 and line 351. Why is the same operation performed again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we are not locking the chain mutex. This ensures that the chain did not change between the two reads and ancestorHash is indeed an ancestor of hash because they are found in the same version of the canonical chain. I hope I assume correctly that if a reorg happens then the newer hash will change first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I guess you're doing this to make up for the data-race you introduced by dropping the mutex...
Might be good to revamp this old pr, and make use of it in LES: #19319

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that PR, we should definitely use it as a speed-up layer on top of the hash chain in the db. LES needs to be able to access hashes and ancestors throughout the entire header chain though, and when we have to access the db we should not lock the mutex because the db access takes a long time and the mutex becomes a bottleneck (as we have seen in our tests). So I suggest to first do what I did in this PR and then replace the hash chain accessors with a cached function in a subsequent PR based on your work. Caching the canonical hashes is a big and critical change and should not be jammed in a bugfix PR.

number -= ancestor
return ancestorHash, number
}
}
if *maxNonCanonical == 0 {
return common.Hash{}, 0
Expand Down
16 changes: 9 additions & 7 deletions les/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
const (
negBalanceExpTC = time.Hour // time constant for exponentially reducing negative balance
fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format
connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
connectedBias = time.Minute * 5 // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
lazyQueueRefresh = time.Second * 10 // refresh period of the connected queue
)

Expand Down Expand Up @@ -366,12 +366,14 @@ func (f *clientPool) setLimits(count int, totalCap uint64) {

f.countLimit = count
f.capacityLimit = totalCap
now := mclock.Now()
f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool {
c := data.(*clientInfo)
f.dropClient(c, now, true)
return f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit
})
if f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit {
now := mclock.Now()
f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool {
c := data.(*clientInfo)
f.dropClient(c, now, true)
return f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit
})
}
}

// requestCost feeds request cost after serving a request from the given peer.
Expand Down
43 changes: 40 additions & 3 deletions les/costtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

const makeCostStats = false // make request cost statistics during operation
Expand Down Expand Up @@ -87,7 +88,7 @@ const (
gfUsageTC = time.Second
gfRaiseTC = time.Second * 200
gfDropTC = time.Second * 50
gfDbKey = "_globalCostFactorV3"
gfDbKey = "_globalCostFactorV6"
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
)

// costTracker is responsible for calculating costs and cost estimates on the
Expand Down Expand Up @@ -226,6 +227,9 @@ type reqInfo struct {
// servingTime is the CPU time corresponding to the actual processing of
// the request.
servingTime float64

// msgCode indicates the type of request.
msgCode uint64
}

// gfLoop starts an event loop which updates the global cost factor which is
Expand Down Expand Up @@ -269,11 +273,43 @@ func (ct *costTracker) gfLoop() {
for {
select {
case r := <-ct.reqInfoCh:
relCost := int64(factor * r.servingTime * 100 / r.avgTimeCost) // Convert the value to a percentage form

// Record more metrics if we are debugging
if metrics.EnabledExpensive {
switch r.msgCode {
case GetBlockHeadersMsg:
relativeCostHeaderHistogram.Update(relCost)
case GetBlockBodiesMsg:
relativeCostBodyHistogram.Update(relCost)
case GetReceiptsMsg:
relativeCostReceiptHistogram.Update(relCost)
case GetCodeMsg:
relativeCostCodeHistogram.Update(relCost)
case GetProofsV2Msg:
relativeCostProofHistogram.Update(relCost)
case GetHelperTrieProofsMsg:
relativeCostHelperProofHistogram.Update(relCost)
case SendTxV2Msg:
relativeCostSendTxHistogram.Update(relCost)
case GetTxStatusMsg:
relativeCostTxStatusHistogram.Update(relCost)
}
}
// SendTxV2 and GetTxStatus requests are two special cases.
// All other requests will only put pressure on the database, and
// the corresponding delay is relatively stable. While these two
// requests involve txpool query, which is usually unstable.
//
// TODO(rjl493456442) fixes this.
if r.msgCode == SendTxV2Msg || r.msgCode == GetTxStatusMsg {
continue
}
requestServedMeter.Mark(int64(r.servingTime))
requestServedTimer.Update(time.Duration(r.servingTime))
requestEstimatedMeter.Mark(int64(r.avgTimeCost / factor))
requestEstimatedTimer.Update(time.Duration(r.avgTimeCost / factor))
relativeCostHistogram.Update(int64(r.avgTimeCost / factor / r.servingTime))
relativeCostHistogram.Update(relCost)

now := mclock.Now()
dt := float64(now - expUpdate)
Expand Down Expand Up @@ -324,6 +360,7 @@ func (ct *costTracker) gfLoop() {
default:
}
}
globalFactorGauge.Update(int64(1000 * factor))
log.Debug("global cost factor updated", "factor", factor)
}
}
Expand Down Expand Up @@ -375,7 +412,7 @@ func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) {
avg := reqAvgTimeCost[code]
avgTimeCost := avg.baseCost + amount*avg.reqCost
select {
case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime)}:
case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime), code}:
default:
}
if makeCostStats {
Expand Down
30 changes: 24 additions & 6 deletions les/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ var (
miscOutTxStatusPacketsMeter = metrics.NewRegisteredMeter("les/misc/out/packets/txStatus", nil)
miscOutTxStatusTrafficMeter = metrics.NewRegisteredMeter("les/misc/out/traffic/txStatus", nil)

miscServingTimeHeaderTimer = metrics.NewRegisteredTimer("les/misc/serve/header", nil)
miscServingTimeBodyTimer = metrics.NewRegisteredTimer("les/misc/serve/body", nil)
miscServingTimeCodeTimer = metrics.NewRegisteredTimer("les/misc/serve/code", nil)
miscServingTimeReceiptTimer = metrics.NewRegisteredTimer("les/misc/serve/receipt", nil)
miscServingTimeTrieProofTimer = metrics.NewRegisteredTimer("les/misc/serve/proof", nil)
miscServingTimeHelperTrieTimer = metrics.NewRegisteredTimer("les/misc/serve/helperTrie", nil)
miscServingTimeTxTimer = metrics.NewRegisteredTimer("les/misc/serve/txs", nil)
miscServingTimeTxStatusTimer = metrics.NewRegisteredTimer("les/misc/serve/txStatus", nil)

connectionTimer = metrics.NewRegisteredTimer("les/connection/duration", nil)
serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)
Expand All @@ -69,12 +78,21 @@ var (
totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)

requestServedMeter = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil)
requestServedTimer = metrics.NewRegisteredTimer("les/server/req/servedTime", nil)
requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/req/avgEstimatedTime", nil)
requestEstimatedTimer = metrics.NewRegisteredTimer("les/server/req/estimatedTime", nil)
relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/req/relative", nil, metrics.NewExpDecaySample(1028, 0.015))

requestServedMeter = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil)
requestServedTimer = metrics.NewRegisteredTimer("les/server/req/servedTime", nil)
requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/req/avgEstimatedTime", nil)
requestEstimatedTimer = metrics.NewRegisteredTimer("les/server/req/estimatedTime", nil)
relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/req/relative", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostHeaderHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/header", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostBodyHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/body", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostReceiptHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/receipt", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostCodeHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/code", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostProofHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/proof", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostHelperProofHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/helperTrie", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostSendTxHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/txs", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostTxStatusHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/txStatus", nil, metrics.NewExpDecaySample(1028, 0.015))

globalFactorGauge = metrics.NewRegisteredGauge("les/server/globalFactor", nil)
recentServedGauge = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil)
recentEstimatedGauge = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil)
sqServedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil)
Expand Down
1 change: 1 addition & 0 deletions les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
}
srv.fcManager.SetCapacityLimits(srv.freeCapacity, maxCapacity, srv.freeCapacity*2)
srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, 10000, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) })
srv.clientPool.setPriceFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1})

checkpoint := srv.latestLocalCheckpoint()
if !checkpoint.Empty() {
Expand Down
8 changes: 8 additions & 0 deletions les/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInHeaderPacketsMeter.Mark(1)
miscInHeaderTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeHeaderTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -380,6 +381,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInBodyPacketsMeter.Mark(1)
miscInBodyTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeBodyTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -428,6 +430,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInCodePacketsMeter.Mark(1)
miscInCodeTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeCodeTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -499,6 +502,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInReceiptPacketsMeter.Mark(1)
miscInReceiptTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeReceiptTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -555,6 +559,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInTrieProofPacketsMeter.Mark(1)
miscInTrieProofTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeTrieProofTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -657,6 +662,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInHelperTriePacketsMeter.Mark(1)
miscInHelperTrieTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeHelperTrieTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -731,6 +737,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInTxsPacketsMeter.Mark(1)
miscInTxsTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeTxTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -779,6 +786,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInTxStatusPacketsMeter.Mark(1)
miscInTxStatusTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeTxStatusTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down
3 changes: 0 additions & 3 deletions light/lightchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,6 @@ func (lc *LightChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []com
//
// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
func (lc *LightChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
lc.chainmu.RLock()
defer lc.chainmu.RUnlock()

return lc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
}

Expand Down