Skip to content

Commit

Permalink
Merge pull request bnb-chain#2 from flywukong/IO-metrics
Browse files Browse the repository at this point in the history
I/O and prefetcher cost metrics
  • Loading branch information
forcodedancing authored Feb 18, 2022
2 parents 72a780f + df4fdd7 commit 528eb1d
Show file tree
Hide file tree
Showing 14 changed files with 329 additions and 18 deletions.
124 changes: 124 additions & 0 deletions cachemetrics/cache_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package cachemetrics

import (
"github.com/ethereum/go-ethereum/metrics"
"time"
)

type cacheLayerName string

const (
CacheL1ACCOUNT cacheLayerName = "CACHE_L1_ACCOUNT"
CacheL2ACCOUNT cacheLayerName = "CACHE_L2_ACCOUNT"
CacheL3ACCOUNT cacheLayerName = "CACHE_L3_ACCOUNT"
DiskL4ACCOUNT cacheLayerName = "DISK_L4_ACCOUNT"
CacheL1STORAGE cacheLayerName = "CACHE_L1_STORAGE"
CacheL2STORAGE cacheLayerName = "CACHE_L2_STORAGE"
CacheL3STORAGE cacheLayerName = "CACHE_L3_STORAGE"
DiskL4STORAGE cacheLayerName = "DISK_L4_STORAGE"
)

var (
cacheL1AccountTimer = metrics.NewRegisteredTimer("cache/cost/account/layer1", nil)
cacheL2AccountTimer = metrics.NewRegisteredTimer("cache/cost/account/layer2", nil)
cacheL3AccountTimer = metrics.NewRegisteredTimer("cache/cost/account/layer3", nil)
diskL4AccountTimer = metrics.NewRegisteredTimer("cache/cost/account/layer4", nil)
cacheL1StorageTimer = metrics.NewRegisteredTimer("cache/cost/storage/layer1", nil)
cacheL2StorageTimer = metrics.NewRegisteredTimer("cache/cost/storage/layer2", nil)
cacheL3StorageTimer = metrics.NewRegisteredTimer("cache/cost/storage/layer3", nil)
diskL4StorageTimer = metrics.NewRegisteredTimer("cache/cost/storage/layer4", nil)

cacheL1AccountCounter = metrics.NewRegisteredCounter("cache/count/account/layer1", nil)
cacheL2AccountCounter = metrics.NewRegisteredCounter("cache/count/account/layer2", nil)
cacheL3AccountCounter = metrics.NewRegisteredCounter("cache/count/account/layer3", nil)
diskL4AccountCounter = metrics.NewRegisteredCounter("cache/count/account/layer4", nil)
cacheL1StorageCounter = metrics.NewRegisteredCounter("cache/count/storage/layer1", nil)
cacheL2StorageCounter = metrics.NewRegisteredCounter("cache/count/storage/layer2", nil)
cacheL3StorageCounter = metrics.NewRegisteredCounter("cache/count/storage/layer3", nil)
diskL4StorageCounter = metrics.NewRegisteredCounter("cache/count/storage/layer4", nil)

cacheL1AccountCostCounter = metrics.NewRegisteredCounter("cache/totalcost/account/layer1", nil)
cacheL2AccountCostCounter = metrics.NewRegisteredCounter("cache/totalcost/account/layer2", nil)
cacheL3AccountCostCounter = metrics.NewRegisteredCounter("cache/totalcost/account/layer3", nil)
diskL4AccountCostCounter = metrics.NewRegisteredCounter("cache/totalcost/account/layer4", nil)
cacheL1StorageCostCounter = metrics.NewRegisteredCounter("cache/totalcost/storage/layer1", nil)
cacheL2StorageCostCounter = metrics.NewRegisteredCounter("cache/totalcost/storage/layer2", nil)
cacheL3StorageCostCounter = metrics.NewRegisteredCounter("cache/totalcost/storage/layer3", nil)
diskL4StorageCostCounter = metrics.NewRegisteredCounter("cache/totalcost/storage/layer4", nil)
)

// mark the info of total hit counts of each layers
func RecordCacheDepth(metricsName cacheLayerName) {
switch metricsName {
case CacheL1ACCOUNT:
cacheL1AccountCounter.Inc(1)
case CacheL2ACCOUNT:
cacheL2AccountCounter.Inc(1)
case CacheL3ACCOUNT:
cacheL3AccountCounter.Inc(1)
case DiskL4ACCOUNT:
diskL4AccountCounter.Inc(1)
case CacheL1STORAGE:
cacheL1StorageCounter.Inc(1)
case CacheL2STORAGE:
cacheL2StorageCounter.Inc(1)
case CacheL3STORAGE:
cacheL3StorageCounter.Inc(1)
case DiskL4STORAGE:
diskL4StorageCounter.Inc(1)
}
}

// mark the dalays of each layers
func RecordCacheMetrics(metricsName cacheLayerName, start time.Time) {
switch metricsName {
case CacheL1ACCOUNT:
recordCost(cacheL1AccountTimer, start)
case CacheL2ACCOUNT:
recordCost(cacheL2AccountTimer, start)
case CacheL3ACCOUNT:
recordCost(cacheL3AccountTimer, start)
case DiskL4ACCOUNT:
recordCost(diskL4AccountTimer, start)
case CacheL1STORAGE:
recordCost(cacheL1StorageTimer, start)
case CacheL2STORAGE:
recordCost(cacheL2StorageTimer, start)
case CacheL3STORAGE:
recordCost(cacheL3StorageTimer, start)
case DiskL4STORAGE:
recordCost(diskL4StorageTimer, start)

}
}

// accumulate the total dalays of each layers
func RecordTotalCosts(metricsName cacheLayerName, start time.Time) {
switch metricsName {
case CacheL1ACCOUNT:
accumulateCost(cacheL1AccountCostCounter, start)
case CacheL2ACCOUNT:
accumulateCost(cacheL2AccountCostCounter, start)
case CacheL3ACCOUNT:
accumulateCost(cacheL3AccountCostCounter, start)
case DiskL4ACCOUNT:
accumulateCost(diskL4AccountCostCounter, start)
case CacheL1STORAGE:
accumulateCost(cacheL1StorageCostCounter, start)
case CacheL2STORAGE:
accumulateCost(cacheL2StorageCostCounter, start)
case CacheL3STORAGE:
accumulateCost(cacheL3StorageCostCounter, start)
case DiskL4STORAGE:
accumulateCost(diskL4StorageCostCounter, start)

}
}

func recordCost(timer metrics.Timer, start time.Time) {
timer.Update(time.Since(start))
}

func accumulateCost(totalcost metrics.Counter, start time.Time) {
totalcost.Inc(time.Since(start).Nanoseconds())
}
6 changes: 6 additions & 0 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ func applyMetricConfig(ctx *cli.Context, cfg *gethConfig) {
if ctx.GlobalIsSet(utils.MetricsEnabledExpensiveFlag.Name) {
cfg.Metrics.EnabledExpensive = ctx.GlobalBool(utils.MetricsEnabledExpensiveFlag.Name)
}
if ctx.GlobalIsSet(utils.MetricsEnabledRecordIOFlag.Name) {
cfg.Metrics.EnabledExpensive = ctx.GlobalBool(utils.MetricsEnabledRecordIOFlag.Name)
}
if ctx.GlobalIsSet(utils.MetricsDisablePrefetchFlag.Name) {
cfg.Metrics.EnabledExpensive = ctx.GlobalBool(utils.MetricsDisablePrefetchFlag.Name)
}
if ctx.GlobalIsSet(utils.MetricsHTTPFlag.Name) {
cfg.Metrics.HTTP = ctx.GlobalString(utils.MetricsHTTPFlag.Name)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ var (
metricsFlags = []cli.Flag{
utils.MetricsEnabledFlag,
utils.MetricsEnabledExpensiveFlag,
utils.MetricsEnabledRecordIOFlag,
utils.MetricsDisablePrefetchFlag,
utils.MetricsHTTPFlag,
utils.MetricsPortFlag,
utils.MetricsEnableInfluxDBFlag,
Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,14 @@ var (
Name: "metrics.expensive",
Usage: "Enable expensive metrics collection and reporting",
}
MetricsDisablePrefetchFlag = cli.BoolFlag{
Name: "metrics.noprefetch",
Usage: "disable prefetch when metrics collecting",
}
MetricsEnabledRecordIOFlag = cli.BoolFlag{
Name: "metrics.iorecord",
Usage: "Enable recording io metrics",
}

// MetricsHTTPFlag defines the endpoint for a stand-alone metrics HTTP endpoint.
// Since the pprof service enables sensitive/vulnerable behavior, this allows a user
Expand Down
11 changes: 11 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ var (

snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/account/reads", nil)
snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil)
totalAccountReadTimer = metrics.NewRegisteredTimer("chain/total/account/reads", nil)
totalStorageReadTimer = metrics.NewRegisteredTimer("chain/total/storage/reads", nil)
totalReadTimer = metrics.NewRegisteredTimer("chain/total/cost/reads", nil)
snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)

blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
Expand Down Expand Up @@ -2151,6 +2154,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them

accountReadCost := statedb.SnapshotAccountReads + statedb.L1CacheAccountReads + statedb.AccountReads
storageReadCost := statedb.SnapshotStorageReads + statedb.L1CacheStorageReads + statedb.StorageReads
// mark the total io process cost in L1-L4 layers of account
totalAccountReadTimer.Update(accountReadCost)
// mark the total io process cost in L1-L4 layers of storage
totalStorageReadTimer.Update(storageReadCost)
// mark the total io process cost in L1-L4 layers
totalReadTimer.Update(storageReadCost + accountReadCost)
blockExecutionTimer.Update(time.Since(substart))

// Validate the state using the default validator
Expand Down
37 changes: 31 additions & 6 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package snapshot
import (
"encoding/binary"
"fmt"
"github.com/ethereum/go-ethereum/cachemetrics"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -326,6 +327,16 @@ func (dl *diffLayer) AccountRLP(hash common.Hash) ([]byte, error) {
if !hit {
hit = dl.diffed.Contains(destructBloomHasher(hash))
}

start := time.Now()
hitInDifflayer := false
defer func() {
if hitInDifflayer {
cachemetrics.RecordCacheDepth("CACHE_L2_ACCOUNT")
cachemetrics.RecordCacheMetrics("CACHE_L2_ACCOUNT", start)
cachemetrics.RecordTotalCosts("CACHE_L2_ACCOUNT", start)
}
}()
var origin *diskLayer
if !hit {
origin = dl.origin // extract origin while holding the lock
Expand All @@ -339,13 +350,13 @@ func (dl *diffLayer) AccountRLP(hash common.Hash) ([]byte, error) {
return origin.AccountRLP(hash)
}
// The bloom filter hit, start poking in the internal maps
return dl.accountRLP(hash, 0)
return dl.accountRLP(hash, 0, &hitInDifflayer)
}

// accountRLP is an internal version of AccountRLP that skips the bloom filter
// checks and uses the internal maps to try and retrieve the data. It's meant
// to be used if a higher layer's bloom filter hit already.
func (dl *diffLayer) accountRLP(hash common.Hash, depth int) ([]byte, error) {
func (dl *diffLayer) accountRLP(hash common.Hash, depth int, hit *bool) ([]byte, error) {
dl.lock.RLock()
defer dl.lock.RUnlock()

Expand All @@ -360,6 +371,7 @@ func (dl *diffLayer) accountRLP(hash common.Hash, depth int) ([]byte, error) {
snapshotDirtyAccountHitDepthHist.Update(int64(depth))
snapshotDirtyAccountReadMeter.Mark(int64(len(data)))
snapshotBloomAccountTrueHitMeter.Mark(1)
*hit = true
return data, nil
}
// If the account is known locally, but deleted, return it
Expand All @@ -368,11 +380,12 @@ func (dl *diffLayer) accountRLP(hash common.Hash, depth int) ([]byte, error) {
snapshotDirtyAccountHitDepthHist.Update(int64(depth))
snapshotDirtyAccountInexMeter.Mark(1)
snapshotBloomAccountTrueHitMeter.Mark(1)
*hit = true
return nil, nil
}
// Account unknown to this diff, resolve from parent
if diff, ok := dl.parent.(*diffLayer); ok {
return diff.accountRLP(hash, depth+1)
return diff.accountRLP(hash, depth+1, hit)
}
// Failed to resolve through diff layers, mark a bloom error and use the disk
snapshotBloomAccountFalseHitMeter.Mark(1)
Expand All @@ -387,6 +400,16 @@ func (dl *diffLayer) accountRLP(hash common.Hash, depth int) ([]byte, error) {
func (dl *diffLayer) Storage(accountHash, storageHash common.Hash) ([]byte, error) {
// Check the bloom filter first whether there's even a point in reaching into
// all the maps in all the layers below
start := time.Now()
hitInDifflayer := false
defer func() {
if hitInDifflayer {
cachemetrics.RecordCacheDepth("CACHE_L2_STORAGE")
cachemetrics.RecordCacheMetrics("CACHE_L2_STORAGE", start)
cachemetrics.RecordTotalCosts("CACHE_L2_STORAGE", start)
}
}()

dl.lock.RLock()
hit := dl.diffed.Contains(storageBloomHasher{accountHash, storageHash})
if !hit {
Expand All @@ -405,13 +428,13 @@ func (dl *diffLayer) Storage(accountHash, storageHash common.Hash) ([]byte, erro
return origin.Storage(accountHash, storageHash)
}
// The bloom filter hit, start poking in the internal maps
return dl.storage(accountHash, storageHash, 0)
return dl.storage(accountHash, storageHash, 0, &hitInDifflayer)
}

// storage is an internal version of Storage that skips the bloom filter checks
// and uses the internal maps to try and retrieve the data. It's meant to be
// used if a higher layer's bloom filter hit already.
func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int) ([]byte, error) {
func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int, hit *bool) ([]byte, error) {
dl.lock.RLock()
defer dl.lock.RUnlock()

Expand All @@ -423,6 +446,7 @@ func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int) ([
// If the account is known locally, try to resolve the slot locally
if storage, ok := dl.storageData[accountHash]; ok {
if data, ok := storage[storageHash]; ok {
*hit = true
snapshotDirtyStorageHitMeter.Mark(1)
//snapshotDirtyStorageHitDepthHist.Update(int64(depth))
if n := len(data); n > 0 {
Expand All @@ -436,6 +460,7 @@ func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int) ([
}
// If the account is known locally, but deleted, return an empty slot
if _, ok := dl.destructSet[accountHash]; ok {
*hit = true
snapshotDirtyStorageHitMeter.Mark(1)
//snapshotDirtyStorageHitDepthHist.Update(int64(depth))
snapshotDirtyStorageInexMeter.Mark(1)
Expand All @@ -444,7 +469,7 @@ func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int) ([
}
// Storage slot unknown to this diff, resolve from parent
if diff, ok := dl.parent.(*diffLayer); ok {
return diff.storage(accountHash, storageHash, depth+1)
return diff.storage(accountHash, storageHash, depth+1, hit)
}
// Failed to resolve through diff layers, mark a bloom error and use the disk
snapshotBloomStorageFalseHitMeter.Mark(1)
Expand Down
22 changes: 20 additions & 2 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package snapshot

import (
"bytes"
"github.com/ethereum/go-ethereum/cachemetrics"
"sync"
"time"

"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -95,7 +97,7 @@ func (dl *diskLayer) Account(hash common.Hash) (*Account, error) {
func (dl *diskLayer) AccountRLP(hash common.Hash) ([]byte, error) {
dl.lock.RLock()
defer dl.lock.RUnlock()

start := time.Now()
// If the layer was flattened into, consider it invalid (any live reference to
// the original should be marked as unusable).
if dl.stale {
Expand All @@ -113,12 +115,20 @@ func (dl *diskLayer) AccountRLP(hash common.Hash) ([]byte, error) {
if blob, found := dl.cache.HasGet(nil, hash[:]); found {
snapshotCleanAccountHitMeter.Mark(1)
snapshotCleanAccountReadMeter.Mark(int64(len(blob)))
cachemetrics.RecordCacheDepth("CACHE_L3_ACCOUNT")
cachemetrics.RecordCacheMetrics("CACHE_L3_ACCOUNT", start)
cachemetrics.RecordTotalCosts("CACHE_L3_ACCOUNT", start)
return blob, nil
}

startGetInDisk := time.Now()
// Cache doesn't contain account, pull from disk and cache for later
blob := rawdb.ReadAccountSnapshot(dl.diskdb, hash)
dl.cache.Set(hash[:], blob)

cachemetrics.RecordCacheDepth("DISK_L4_ACCOUNT")
cachemetrics.RecordCacheMetrics("DISK_L4_ACCOUNT", startGetInDisk)
cachemetrics.RecordTotalCosts("DISK_L4_ACCOUNT", start)
snapshotCleanAccountMissMeter.Mark(1)
if n := len(blob); n > 0 {
snapshotCleanAccountWriteMeter.Mark(int64(n))
Expand All @@ -133,6 +143,7 @@ func (dl *diskLayer) AccountRLP(hash common.Hash) ([]byte, error) {
func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, error) {
dl.lock.RLock()
defer dl.lock.RUnlock()
start := time.Now()

// If the layer was flattened into, consider it invalid (any live reference to
// the original should be marked as unusable).
Expand All @@ -148,17 +159,24 @@ func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, erro
}
// If we're in the disk layer, all diff layers missed
snapshotDirtyStorageMissMeter.Mark(1)

// Try to retrieve the storage slot from the memory cache
if blob, found := dl.cache.HasGet(nil, key); found {
snapshotCleanStorageHitMeter.Mark(1)
snapshotCleanStorageReadMeter.Mark(int64(len(blob)))
cachemetrics.RecordCacheDepth("CACHE_L3_STORAGE")
cachemetrics.RecordCacheMetrics("CACHE_L3_STORAGE", start)
cachemetrics.RecordTotalCosts("CACHE_L3_STORAGE", start)
return blob, nil
}
startGetInDisk := time.Now()
// Cache doesn't contain storage slot, pull from disk and cache for later
blob := rawdb.ReadStorageSnapshot(dl.diskdb, accountHash, storageHash)
dl.cache.Set(key, blob)

cachemetrics.RecordCacheDepth("DISK_L4_STORAGE")
cachemetrics.RecordCacheMetrics("DISK_L4_STORAGE", startGetInDisk)
cachemetrics.RecordTotalCosts("DISK_L4_STORAGE", start)

snapshotCleanStorageMissMeter.Mark(1)
if n := len(blob); n > 0 {
snapshotCleanStorageWriteMeter.Mark(int64(n))
Expand Down
Loading

0 comments on commit 528eb1d

Please sign in to comment.