Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: detailed relative cost metrics (test only, do not merge) #20036

Closed
wants to merge 10 commits into from
3 changes: 0 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,9 +2151,6 @@ func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []com
//
// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
bc.chainmu.RLock()
defer bc.chainmu.RUnlock()

return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
}

Expand Down
7 changes: 5 additions & 2 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,11 @@ func (hc *HeaderChain) GetAncestor(hash common.Hash, number, ancestor uint64, ma
}
for ancestor != 0 {
if rawdb.ReadCanonicalHash(hc.chainDb, number) == hash {
number -= ancestor
return rawdb.ReadCanonicalHash(hc.chainDb, number), number
ancestorHash := rawdb.ReadCanonicalHash(hc.chainDb, number-ancestor)
if rawdb.ReadCanonicalHash(hc.chainDb, number) == hash {
number -= ancestor
return ancestorHash, number
}
}
if *maxNonCanonical == 0 {
return common.Hash{}, 0
Expand Down
16 changes: 9 additions & 7 deletions les/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
const (
negBalanceExpTC = time.Hour // time constant for exponentially reducing negative balance
fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format
connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
connectedBias = time.Minute * 5 // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
lazyQueueRefresh = time.Second * 10 // refresh period of the connected queue
)

Expand Down Expand Up @@ -366,12 +366,14 @@ func (f *clientPool) setLimits(count int, totalCap uint64) {

f.countLimit = count
f.capacityLimit = totalCap
now := mclock.Now()
f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool {
c := data.(*clientInfo)
f.dropClient(c, now, true)
return f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit
})
if f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit {
now := mclock.Now()
f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool {
c := data.(*clientInfo)
f.dropClient(c, now, true)
return f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit
})
}
}

// requestCost feeds request cost after serving a request from the given peer.
Expand Down
62 changes: 58 additions & 4 deletions les/costtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

const makeCostStats = false // make request cost statistics during operation
Expand Down Expand Up @@ -87,7 +88,7 @@ const (
gfUsageTC = time.Second
gfRaiseTC = time.Second * 200
gfDropTC = time.Second * 50
gfDbKey = "_globalCostFactorV3"
gfDbKey = "_globalCostFactorV6"
)

// costTracker is responsible for calculating costs and cost estimates on the
Expand Down Expand Up @@ -226,6 +227,9 @@ type reqInfo struct {
// servingTime is the CPU time corresponding to the actual processing of
// the request.
servingTime float64

// msgCode indicates the type of request.
msgCode uint64
}

// gfLoop starts an event loop which updates the global cost factor which is
Expand Down Expand Up @@ -265,17 +269,67 @@ func (ct *costTracker) gfLoop() {
log.Debug("global cost factor saved", "value", factor)
}
saveTicker := time.NewTicker(time.Minute * 10)
mclockTicker := time.NewTicker(time.Millisecond * 100)
lastClock := mclock.Now()
noUpdate := lastClock

for {
select {
case <-mclockTicker.C:
now := mclock.Now()
dt := time.Duration(now - lastClock)
lastClock = now
mclockTimer.Update(dt)
if dt > time.Millisecond*300 {
noUpdate = now + mclock.AbsTime(time.Second*5)
}

case r := <-ct.reqInfoCh:
now := mclock.Now()
if time.Duration(now-lastClock) > time.Millisecond*300 {
noUpdate = now + mclock.AbsTime(time.Second*5)
}
if now < noUpdate {
continue
}
relCost := int64(factor * r.servingTime * 100 / r.avgTimeCost) // Convert the value to a percentage form

// Record more metrics if we are debugging
if metrics.EnabledExpensive {
switch r.msgCode {
case GetBlockHeadersMsg:
relativeCostHeaderHistogram.Update(relCost)
case GetBlockBodiesMsg:
relativeCostBodyHistogram.Update(relCost)
case GetReceiptsMsg:
relativeCostReceiptHistogram.Update(relCost)
case GetCodeMsg:
relativeCostCodeHistogram.Update(relCost)
case GetProofsV2Msg:
relativeCostProofHistogram.Update(relCost)
case GetHelperTrieProofsMsg:
relativeCostHelperProofHistogram.Update(relCost)
case SendTxV2Msg:
relativeCostSendTxHistogram.Update(relCost)
case GetTxStatusMsg:
relativeCostTxStatusHistogram.Update(relCost)
}
}
// SendTxV2 and GetTxStatus requests are two special cases.
// All other requests will only put pressure on the database, and
// the corresponding delay is relatively stable. While these two
// requests involve txpool query, which is usually unstable.
//
// TODO(rjl493456442) fixes this.
if r.msgCode == SendTxV2Msg || r.msgCode == GetTxStatusMsg {
continue
}
requestServedMeter.Mark(int64(r.servingTime))
requestServedTimer.Update(time.Duration(r.servingTime))
requestEstimatedMeter.Mark(int64(r.avgTimeCost / factor))
requestEstimatedTimer.Update(time.Duration(r.avgTimeCost / factor))
relativeCostHistogram.Update(int64(r.avgTimeCost / factor / r.servingTime))
relativeCostHistogram.Update(relCost)

now := mclock.Now()
dt := float64(now - expUpdate)
expUpdate = now
exp := math.Exp(-dt / float64(gfUsageTC))
Expand Down Expand Up @@ -375,7 +429,7 @@ func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) {
avg := reqAvgTimeCost[code]
avgTimeCost := avg.baseCost + amount*avg.reqCost
select {
case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime)}:
case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime), code}:
default:
}
if makeCostStats {
Expand Down
29 changes: 24 additions & 5 deletions les/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ var (
miscOutTxStatusPacketsMeter = metrics.NewRegisteredMeter("les/misc/out/packets/txStatus", nil)
miscOutTxStatusTrafficMeter = metrics.NewRegisteredMeter("les/misc/out/traffic/txStatus", nil)

miscServingTimeHeaderTimer = metrics.NewRegisteredTimer("les/misc/serve/header", nil)
miscServingTimeBodyTimer = metrics.NewRegisteredTimer("les/misc/serve/body", nil)
miscServingTimeCodeTimer = metrics.NewRegisteredTimer("les/misc/serve/code", nil)
miscServingTimeReceiptTimer = metrics.NewRegisteredTimer("les/misc/serve/receipt", nil)
miscServingTimeTrieProofTimer = metrics.NewRegisteredTimer("les/misc/serve/proof", nil)
miscServingTimeHelperTrieTimer = metrics.NewRegisteredTimer("les/misc/serve/helperTrie", nil)
miscServingTimeTxTimer = metrics.NewRegisteredTimer("les/misc/serve/tx", nil)
miscServingTimeTxStatusTimer = metrics.NewRegisteredTimer("les/misc/serve/txstatus", nil)

connectionTimer = metrics.NewRegisteredTimer("les/connection/duration", nil)
serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)
Expand All @@ -69,11 +78,21 @@ var (
totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)

requestServedMeter = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil)
requestServedTimer = metrics.NewRegisteredTimer("les/server/req/servedTime", nil)
requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/req/avgEstimatedTime", nil)
requestEstimatedTimer = metrics.NewRegisteredTimer("les/server/req/estimatedTime", nil)
relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/req/relative", nil, metrics.NewExpDecaySample(1028, 0.015))
requestServedMeter = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil)
requestServedTimer = metrics.NewRegisteredTimer("les/server/req/servedTime", nil)
requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/req/avgEstimatedTime", nil)
requestEstimatedTimer = metrics.NewRegisteredTimer("les/server/req/estimatedTime", nil)
relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/req/relative", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostHeaderHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/header", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostBodyHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/body", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostReceiptHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/receipt", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostCodeHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/code", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostProofHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/proof", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostHelperProofHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/helperproof", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostSendTxHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/sendtx", nil, metrics.NewExpDecaySample(1028, 0.015))
relativeCostTxStatusHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/txstatus", nil, metrics.NewExpDecaySample(1028, 0.015))

mclockTimer = metrics.NewRegisteredTimer("les/server/mclock", nil)

recentServedGauge = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil)
recentEstimatedGauge = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil)
Expand Down
1 change: 1 addition & 0 deletions les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
}
srv.fcManager.SetCapacityLimits(srv.freeCapacity, maxCapacity, srv.freeCapacity*2)
srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, 10000, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) })
srv.clientPool.setPriceFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1})

checkpoint := srv.latestLocalCheckpoint()
if !checkpoint.Empty() {
Expand Down
25 changes: 18 additions & 7 deletions les/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInHeaderPacketsMeter.Mark(1)
miscInHeaderTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeHeaderTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -380,6 +381,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInBodyPacketsMeter.Mark(1)
miscInBodyTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeBodyTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -428,6 +430,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInCodePacketsMeter.Mark(1)
miscInCodeTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeCodeTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -499,6 +502,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInReceiptPacketsMeter.Mark(1)
miscInReceiptTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeReceiptTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -555,6 +559,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInTrieProofPacketsMeter.Mark(1)
miscInTrieProofTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeTrieProofTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -657,6 +662,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInHelperTriePacketsMeter.Mark(1)
miscInHelperTrieTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeHelperTrieTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -731,6 +737,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInTxsPacketsMeter.Mark(1)
miscInTxsTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeTxTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -779,6 +786,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
if metrics.EnabledExpensive {
miscInTxStatusPacketsMeter.Mark(1)
miscInTxStatusTrafficMeter.Mark(int64(msg.Size))
defer func(start time.Time) { miscServingTimeTxStatusTimer.UpdateSince(start) }(time.Now())
}
var req struct {
ReqID uint64
Expand Down Expand Up @@ -867,16 +875,19 @@ func (h *serverHandler) getAuxiliaryHeaders(req HelperTrieReq) []byte {
// txStatus returns the status of a specified transaction.
func (h *serverHandler) txStatus(hash common.Hash) light.TxStatus {
var stat light.TxStatus
// Looking the transaction in txpool first.
stat.Status = h.txpool.Status([]common.Hash{hash})[0]

// If the transaction is unknown to the pool, try looking it up locally.
// Looking the transaction in database first.
// The reason here is usually txpool has a high pressure in the mainnet,
// the cost to query the status in pool is even higher than database.
lookup := h.blockchain.GetTransactionLookup(hash)
if lookup != nil {
stat.Status = core.TxStatusIncluded
stat.Lookup = lookup
}
// If the transaction is unknown to the database, try looking it up in txpool.
if stat.Status == core.TxStatusUnknown {
lookup := h.blockchain.GetTransactionLookup(hash)
if lookup != nil {
stat.Status = core.TxStatusIncluded
stat.Lookup = lookup
}
stat.Status = h.txpool.Status([]common.Hash{hash})[0]
}
return stat
}
Expand Down