Skip to content

Commit

Permalink
Merge tag 'v1.2.7' into release/polygon-1.x-fh2.3
Browse files Browse the repository at this point in the history
# Conflicts:
#	params/version.go
  • Loading branch information
maoueh committed Mar 8, 2024
2 parents 58593b1 + 564981b commit acd345c
Show file tree
Hide file tree
Showing 35 changed files with 208 additions and 124 deletions.
1 change: 1 addition & 0 deletions builder/files/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ syncmode = "full"
# json = false
# backtrace = ""
# debug = true
# enable-block-tracking = false

[p2p]
# maxpeers = 1
Expand Down
3 changes: 2 additions & 1 deletion builder/files/genesis-mainnet-v1.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"muirGlacierBlock": 3395000,
"berlinBlock": 14750000,
"londonBlock": 23850000,
"shanghaiBlock":50523000,
"shanghaiBlock": 50523000,
"cancunBlock": 54876000,
"bor": {
"jaipurBlock": 23850000,
"delhiBlock": 38189056,
Expand Down
5 changes: 5 additions & 0 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -74,6 +75,7 @@ type stateObject struct {
trie Trie // storage trie, which becomes non-nil on first access
code Code // contract bytecode, which gets set when code is loaded

storageMutex sync.Mutex
originStorage Storage // Storage cache of original entries to dedup rewrites
pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution, reset for every transaction
Expand Down Expand Up @@ -176,6 +178,8 @@ func (s *stateObject) GetState(key common.Hash) common.Hash {

// GetCommittedState retrieves a value from the committed account storage trie.
func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
s.storageMutex.Lock()
defer s.storageMutex.Unlock()
// If we have a pending write or clean cached, return that
if value, pending := s.pendingStorage[key]; pending {
return value
Expand All @@ -184,6 +188,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
if value, cached := s.originStorage[key]; cached {
return value
}

// If the object was destructed in *this* block (and potentially resurrected),
// the storage has been cleared out, and we should *not* consult the previous
// database about any storage values. The only possible alternatives are:
Expand Down
1 change: 1 addition & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ type Block struct {
// inter-peer block relay.
ReceivedAt time.Time
ReceivedFrom interface{}
AnnouncedAt *time.Time
}

// "external" block encoding. used for eth protocol, etc.
Expand Down
14 changes: 7 additions & 7 deletions core/types/transaction_marshalling.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,29 +373,29 @@ func (tx *Transaction) UnmarshalJSON(input []byte) error {
itx.BlobHashes = dec.BlobVersionedHashes

// signature R
var ok bool
var overflow bool
if dec.R == nil {
return errors.New("missing required field 'r' in transaction")
}
itx.R, ok = uint256.FromBig((*big.Int)(dec.R))
if !ok {
itx.R, overflow = uint256.FromBig((*big.Int)(dec.R))
if overflow {
return errors.New("'r' value overflows uint256")
}
// signature S
if dec.S == nil {
return errors.New("missing required field 's' in transaction")
}
itx.S, ok = uint256.FromBig((*big.Int)(dec.S))
if !ok {
itx.S, overflow = uint256.FromBig((*big.Int)(dec.S))
if overflow {
return errors.New("'s' value overflows uint256")
}
// signature V
vbig, err := dec.yParityValue()
if err != nil {
return err
}
itx.V, ok = uint256.FromBig(vbig)
if !ok {
itx.V, overflow = uint256.FromBig(vbig)
if overflow {
return errors.New("'v' value overflows uint256")
}
if itx.V.Sign() != 0 || itx.R.Sign() != 0 || itx.S.Sign() != 0 {
Expand Down
9 changes: 5 additions & 4 deletions docs/cli/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ devfakeauthor = false # Run miner without validator set authorization
"32000000" = "0x875500011e5eecc0c554f95d07b31cf59df4ca2505f4dbbfffa7d4e4da917c68"

[log]
vmodule = "" # Per-module verbosity: comma-separated list of <pattern>=<level> (e.g. eth/*=5,p2p=4)
json = false # Format logs with JSON
backtrace = "" # Request a stack trace at a specific logging statement (e.g. "block.go:271")
debug = true # Prepends log messages with call-site location (file and line number) - {requires some effort}
vmodule = "" # Per-module verbosity: comma-separated list of <pattern>=<level> (e.g. eth/*=5,p2p=4)
json = false # Format logs with JSON
backtrace = "" # Request a stack trace at a specific logging statement (e.g. "block.go:271")
debug = true # Prepends log messages with call-site location (file and line number)
enable-block-tracking = false # Enables additional logging of information collected while tracking block lifecycle

[p2p]
maxpeers = 50 # Maximum number of network peers (network disabled if set to 0)
Expand Down
2 changes: 2 additions & 0 deletions docs/cli/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ The ```bor server``` command runs the Bor client.

- ```log.debug```: Prepends log messages with call-site location (file and line number) (default: false)

- ```log.enable-block-tracking```: Enables additional logging of information collected while tracking block lifecycle (default: false)

- ```log.json```: Format logs with JSON (default: false)

- ```vmodule```: Per-module verbosity: comma-separated list of <pattern>=<level> (e.g. eth/*=5,p2p=4)
Expand Down
25 changes: 13 additions & 12 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,19 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Merger: eth.merger,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
EthAPI: blockChainAPI,
checker: checker,
txArrivalWait: eth.p2pServer.TxArrivalWait,
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Merger: eth.merger,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
EthAPI: blockChainAPI,
checker: checker,
txArrivalWait: eth.p2pServer.TxArrivalWait,
enableBlockTracking: eth.config.EnableBlockTracking,
}); err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ type Config struct {

// OverrideVerkle (TODO: remove after the fork)
OverrideVerkle *big.Int `toml:",omitempty"`

// EnableBlockTracking allows logging of information collected while tracking block lifecycle
EnableBlockTracking bool
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
110 changes: 72 additions & 38 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,20 @@ type blockAnnounce struct {

// headerFilterTask represents a batch of headers needing fetcher filtering.
type headerFilterTask struct {
peer string // The source peer of block headers
headers []*types.Header // Collection of headers to filter
time time.Time // Arrival time of the headers
peer string // The source peer of block headers
headers []*types.Header // Collection of headers to filter
time time.Time // Arrival time of the headers
announcedTime time.Time // Announcement time of the availability of the block
}

// bodyFilterTask represents a batch of block bodies (transactions and uncles)
// needing fetcher filtering.
type bodyFilterTask struct {
peer string // The source peer of block bodies
transactions [][]*types.Transaction // Collection of transactions per block bodies
uncles [][]*types.Header // Collection of uncles per block bodies
time time.Time // Arrival time of the blocks' contents
peer string // The source peer of block bodies
transactions [][]*types.Transaction // Collection of transactions per block bodies
uncles [][]*types.Header // Collection of uncles per block bodies
time time.Time // Arrival time of the blocks' contents
announcedTime time.Time // Announcement time of the availability of the block
}

// blockOrHeaderInject represents a schedules import operation.
Expand Down Expand Up @@ -197,34 +199,38 @@ type BlockFetcher struct {
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62)

// Logging
enableBlockTracking bool // Whether to log information collected while tracking block lifecycle
}

// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn, enableBlockTracking bool) *BlockFetcher {
return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New[int64, *blockOrHeaderInject](nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New[int64, *blockOrHeaderInject](nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
enableBlockTracking: enableBlockTracking,
}
}

Expand Down Expand Up @@ -276,7 +282,7 @@ func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {

// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.
func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time, announcedAt time.Time) []*types.Header {
log.Trace("Filtering headers", "peer", peer, "headers", len(headers))

// Send the filter channel to the fetcher
Expand All @@ -289,7 +295,7 @@ func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time
}
// Request the filtering of the header list
select {
case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
case filter <- &headerFilterTask{peer: peer, headers: headers, time: time, announcedTime: announcedAt}:
case <-f.quit:
return nil
}
Expand All @@ -304,7 +310,7 @@ func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time

// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.
func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time, announcedAt time.Time) ([][]*types.Transaction, [][]*types.Header) {
log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))

// Send the filter channel to the fetcher
Expand All @@ -317,7 +323,7 @@ func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transac
}
// Request the filtering of the body list
select {
case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time, announcedTime: announcedAt}:
case <-f.quit:
return nil, nil
}
Expand Down Expand Up @@ -480,7 +486,7 @@ func (f *BlockFetcher) loop() {
log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)

// Create a closure of the fetch and schedule in on a new thread
fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
fetchHeader, hashes, announcedAt := f.fetching[hashes[0]].fetchHeader, hashes, f.fetching[hashes[0]].time
go func(peer string) {
if f.fetchingHook != nil {
f.fetchingHook(hashes)
Expand All @@ -504,7 +510,7 @@ func (f *BlockFetcher) loop() {
select {
case res := <-resCh:
res.Done <- nil
f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now().Add(res.Time))
f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now(), announcedAt)

case <-timeout.C:
// The peer didn't respond in time. The request
Expand Down Expand Up @@ -547,6 +553,7 @@ func (f *BlockFetcher) loop() {

fetchBodies := f.completing[hashes[0]].fetchBodies
bodyFetchMeter.Mark(int64(len(hashes)))
announcedAt := f.completing[hashes[0]].time

go func(peer string, hashes []common.Hash) {
resCh := make(chan *eth.Response)
Expand All @@ -565,7 +572,7 @@ func (f *BlockFetcher) loop() {
res.Done <- nil
// Ignoring withdrawals here, since the block fetcher is not used post-merge.
txs, uncles, _ := res.Res.(*eth.BlockBodiesPacket).Unpack()
f.FilterBodies(peer, txs, uncles, time.Now())
f.FilterBodies(peer, txs, uncles, time.Now(), announcedAt)

case <-timeout.C:
// The peer didn't respond in time. The request
Expand Down Expand Up @@ -631,6 +638,7 @@ func (f *BlockFetcher) loop() {

block := types.NewBlockWithHeader(header)
block.ReceivedAt = task.time
block.AnnouncedAt = &task.announcedTime

complete = append(complete, block)
f.completing[hash] = announce
Expand Down Expand Up @@ -725,6 +733,7 @@ func (f *BlockFetcher) loop() {
if f.getBlock(hash) == nil {
block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
block.ReceivedAt = task.time
block.AnnouncedAt = &task.announcedTime
blocks = append(blocks, block)
} else {
f.forgetHash(hash)
Expand Down Expand Up @@ -923,6 +932,31 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
return
}

if f.enableBlockTracking {
// Log the insertion event
var (
msg string
delayInMs uint64
prettyDelay common.PrettyDuration
)

if block.AnnouncedAt != nil {
msg = "[block tracker] Inserted new block with announcement"
delayInMs = uint64(time.Since(*block.AnnouncedAt).Milliseconds())
prettyDelay = common.PrettyDuration(time.Since(*block.AnnouncedAt))
} else {
msg = "[block tracker] Inserted new block without announcement"
delayInMs = uint64(time.Since(block.ReceivedAt).Milliseconds())
prettyDelay = common.PrettyDuration(time.Since(block.ReceivedAt))
}

totalDelayInMs := uint64(time.Now().UnixMilli()) - block.Time()*1000
totalDelay := common.PrettyDuration(time.Millisecond * time.Duration(totalDelayInMs))

log.Info(msg, "number", block.Number().Uint64(), "hash", hash, "delay", prettyDelay, "delayInMs", delayInMs, "totalDelay", totalDelay, "totalDelayInMs", totalDelayInMs)
}

// If import succeeded, broadcast the block
blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)

Expand Down
2 changes: 1 addition & 1 deletion eth/fetcher/block_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newTester(light bool) *fetcherTester {
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
drops: make(map[string]bool),
}
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer)
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer, false)
tester.fetcher.Start()

return tester
Expand Down
Loading

0 comments on commit acd345c

Please sign in to comment.