Skip to content

Commit

Permalink
feat: support blob storage & miscs; (#2229)
Browse files Browse the repository at this point in the history
* chainconfig: use cancun fork for BSC;
feat: fill WithdrawalsHash when BSC enable cancun fork;

* rawdb: support to CRUD blobs;
freezer: support to freeze block blobs;

* freezer: support to freeze block blobs;

* blockchain: add blob cache & blob query helper;

* blockchain: add blob cache & blob query helper;

* freezer: refactor addition table logic, add uts;

* blobexpiry: add more extra expiry time, and logs;

* ci: fix UT fails;

* fix: fix some PR review comments;

* parlia: implement IsDataAvailable function;

* blob: refactor blob transfer logic;

* blob: support config blob extra reserve;
  • Loading branch information
galaio committed Mar 6, 2024
1 parent 34cb9b9 commit 6498101
Show file tree
Hide file tree
Showing 34 changed files with 1,078 additions and 42 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ var (
utils.VoteJournalDirFlag,
utils.LogDebugFlag,
utils.LogBacktraceAtFlag,
utils.BlobExtraReserveFlag,
}, utils.NetworkFlags, utils.DatabaseFlags)

rpcFlags = []cli.Flag{
Expand Down
12 changes: 12 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,13 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Usage: "Path for the voteJournal dir in fast finality feature (default = inside the datadir)",
Category: flags.FastFinalityCategory,
}

// Blob setting
BlobExtraReserveFlag = &cli.Int64Flag{
Name: "blob.extra-reserve",
Usage: "Extra reserve threshold for blob, blob never expires when -1 is set, default 28800",
Value: params.BlobExtraReserveThreshold,
}
)

var (
Expand Down Expand Up @@ -2112,6 +2119,11 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if err := kzg4844.UseCKZG(ctx.String(CryptoKZGFlag.Name) == "ckzg"); err != nil {
Fatalf("Failed to set KZG library implementation to %s: %v", ctx.String(CryptoKZGFlag.Name), err)
}

// blob setting
if ctx.IsSet(BlobExtraReserveFlag.Name) {
cfg.BlobExtraReserve = ctx.Int64(BlobExtraReserveFlag.Name)
}
}

// SetDNSDiscoveryDefaults configures DNS discovery with the given URL if
Expand Down
1 change: 1 addition & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,5 @@ type PoSA interface {
GetFinalizedHeader(chain ChainHeaderReader, header *types.Header) *types.Header
VerifyVote(chain ChainHeaderReader, vote *types.VoteEnvelope) error
IsActiveValidatorAt(chain ChainHeaderReader, header *types.Header, checkVoteKeyFn func(bLSPublicKey *types.BLSPublicKey) bool) bool
IsDataAvailable(chain ChainHeaderReader, block *types.Block) error
}
40 changes: 40 additions & 0 deletions consensus/parlia/blob_sidecar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package parlia

import (
"crypto/sha256"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
)

// validateBlobSidecar it is same as validateBlobSidecar in core/txpool/validation.go
func validateBlobSidecar(hashes []common.Hash, sidecar *types.BlobTxSidecar) error {
if len(sidecar.Blobs) != len(hashes) {
return fmt.Errorf("invalid number of %d blobs compared to %d blob hashes", len(sidecar.Blobs), len(hashes))
}
if len(sidecar.Commitments) != len(hashes) {
return fmt.Errorf("invalid number of %d blob commitments compared to %d blob hashes", len(sidecar.Commitments), len(hashes))
}
if len(sidecar.Proofs) != len(hashes) {
return fmt.Errorf("invalid number of %d blob proofs compared to %d blob hashes", len(sidecar.Proofs), len(hashes))
}
// Blob quantities match up, validate that the provers match with the
// transaction hash before getting to the cryptography
hasher := sha256.New()
for i, vhash := range hashes {
computed := kzg4844.CalcBlobHashV1(hasher, &sidecar.Commitments[i])
if vhash != computed {
return fmt.Errorf("blob %d: computed hash %#x mismatches transaction one %#x", i, computed, vhash)
}
}
// Blob commitments match with the hashes in the transaction, verify the
// blobs themselves via KZG
for i := range sidecar.Blobs {
if err := kzg4844.VerifyBlobProof(sidecar.Blobs[i], sidecar.Commitments[i], sidecar.Proofs[i]); err != nil {
return fmt.Errorf("invalid blob %d: %v", i, err)
}
}
return nil
}
60 changes: 49 additions & 11 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,46 @@ func (p *Parlia) VerifyHeaders(chain consensus.ChainHeaderReader, headers []*typ
return abort, results
}

// IsDataAvailable it checks that the blobTx block has available blob data
func (p *Parlia) IsDataAvailable(chain consensus.ChainHeaderReader, block *types.Block) error {
if !p.chainConfig.IsCancun(block.Number(), block.Time()) {
return nil
}
// only required to check within BlobReserveThreshold block's DA
currentHeader := chain.CurrentHeader()
if block.NumberU64() < currentHeader.Number.Uint64()-params.BlobReserveThreshold {
return nil
}

// alloc block's versionedHashes
versionedHashes := make([][]common.Hash, 0, len(block.Transactions()))
for _, tx := range block.Transactions() {
versionedHashes = append(versionedHashes, tx.BlobHashes())
}
blobs := block.Blobs()
if len(versionedHashes) != len(blobs) {
return errors.New("blobs do not match the versionedHashes length")
}

// check blob amount
blobCnt := 0
for _, h := range versionedHashes {
blobCnt += len(h)
}
if blobCnt > params.MaxBlobGasPerBlock/params.BlobTxBlobGasPerBlob {
return fmt.Errorf("too many blobs in block: have %d, permitted %d", blobCnt, params.MaxBlobGasPerBlock/params.BlobTxBlobGasPerBlob)
}

// check blob and versioned hash
for i := range versionedHashes {
if err := validateBlobSidecar(versionedHashes[i], blobs[i]); err != nil {
return err
}
}

return nil
}

// getValidatorBytesFromHeader returns the validators bytes extracted from the header's extra field if exists.
// The validators bytes would be contained only in the epoch block's header, and its each validator bytes length is fixed.
// On luban fork, we introduce vote attestation into the header's extra field, so extra format is different from before.
Expand Down Expand Up @@ -598,20 +638,18 @@ func (p *Parlia) verifyHeader(chain consensus.ChainHeaderReader, header *types.H
return fmt.Errorf("invalid excessBlobGas: have %d, expected nil", header.ExcessBlobGas)
case header.BlobGasUsed != nil:
return fmt.Errorf("invalid blobGasUsed: have %d, expected nil", header.BlobGasUsed)
case header.ParentBeaconRoot != nil:
return fmt.Errorf("invalid parentBeaconRoot, have %#x, expected nil", header.ParentBeaconRoot)
case header.WithdrawalsHash != nil:
return fmt.Errorf("invalid WithdrawalsHash, have %#x, expected nil", header.WithdrawalsHash)
}
} else {
if err := eip4844.VerifyEIP4844Header(parent, header); err != nil {
return err
switch {
case header.ParentBeaconRoot != nil:
return fmt.Errorf("invalid parentBeaconRoot, have %#x, expected nil", header.ParentBeaconRoot)
case *header.WithdrawalsHash != common.Hash{}:
return errors.New("header has wrong WithdrawalsHash")
}
}

if !cancun && header.ExcessBlobGas != nil {
return fmt.Errorf("invalid excessBlobGas: have %d, expected nil", header.ExcessBlobGas)
}
if !cancun && header.BlobGasUsed != nil {
return fmt.Errorf("invalid blobGasUsed: have %d, expected nil", header.BlobGasUsed)
}
if cancun {
if err := eip4844.VerifyEIP4844Header(parent, header); err != nil {
return err
}
Expand Down
31 changes: 31 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ const (
blockCacheLimit = 256
diffLayerCacheLimit = 1024
receiptsCacheLimit = 10000
blobsCacheLimit = 10000
txLookupCacheLimit = 1024
maxBadBlockLimit = 16
maxFutureBlocks = 256
Expand Down Expand Up @@ -277,6 +278,7 @@ type BlockChain struct {
receiptsCache *lru.Cache[common.Hash, []*types.Receipt]
blockCache *lru.Cache[common.Hash, *types.Block]
txLookupCache *lru.Cache[common.Hash, txLookup]
blobsCache *lru.Cache[common.Hash, types.BlobTxSidecars]

// future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block]
Expand Down Expand Up @@ -358,6 +360,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blobsCache: lru.NewCache[common.Hash, types.BlobTxSidecars](blobsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
Expand Down Expand Up @@ -989,6 +992,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
bc.receiptsCache.Purge()
bc.blobsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()
Expand Down Expand Up @@ -1376,6 +1380,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [

// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
// TODO(GalaIO): when sync the history block, it needs store blobs too.
//if isCancun() {
// writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db, blockChain, receiptChain, td, blobs)
//}
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
Expand Down Expand Up @@ -1454,6 +1462,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
// TODO(GalaIO): if enable cancun, need write blobs
//if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
// rawdb.WriteBlobs(batch, block.Hash(), block.NumberU64(), blobs)
//}

// Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts)
Expand Down Expand Up @@ -1523,6 +1535,10 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
batch := bc.db.NewBatch()
rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
rawdb.WriteBlock(batch, block)
// if enable cancun, it needs to write blobs too
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
rawdb.WriteBlobs(batch, block.Hash(), block.NumberU64(), block.Blobs())
}
if err := batch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
Expand Down Expand Up @@ -1565,6 +1581,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
rawdb.WriteBlock(blockBatch, block)
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
// if enable cancun, it needs to write blobs too
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
rawdb.WriteBlobs(blockBatch, block.Hash(), block.NumberU64(), block.Blobs())
}
rawdb.WritePreimages(blockBatch, state.Preimages())
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
Expand Down Expand Up @@ -1802,6 +1822,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// racey behaviour. If a sidechain import is in progress, and the historic state
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
// TODO(GalaIO): if enable cancun, it must set received blob cache for check, remove cache when failed
func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) {
// If the chain is terminating, don't even bother starting up.
if bc.insertStopped() {
Expand Down Expand Up @@ -1984,6 +2005,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}

// check blob data available first
// TODO(GalaIO): move IsDataAvailable combine into verifyHeaders?
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
if posa, ok := bc.engine.(consensus.PoSA); ok {
if err = posa.IsDataAvailable(bc, block); err != nil {
return it.index, err
}
}
}
statedb, err := state.NewWithSharedPool(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
Expand Down
17 changes: 17 additions & 0 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,23 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
return receipts
}

// GetBlobsByHash retrieves the blobs for all transactions in a given block.
func (bc *BlockChain) GetBlobsByHash(hash common.Hash) types.BlobTxSidecars {
if blobs, ok := bc.blobsCache.Get(hash); ok {
return blobs
}
number := rawdb.ReadHeaderNumber(bc.db, hash)
if number == nil {
return nil
}
blobs := rawdb.ReadRawBlobs(bc.db, hash, *number)
if blobs == nil {
return nil
}
bc.blobsCache.Add(hash, blobs)
return blobs
}

// GetUnclesInChain retrieves all the uncles from a given block backwards until
// a specific distance is reached.
func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {
Expand Down
3 changes: 3 additions & 0 deletions core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,9 @@ func (cm *chainMaker) makeHeader(parent *types.Block, state *state.StateDB, engi
header.ExcessBlobGas = &excessBlobGas
header.BlobGasUsed = new(uint64)
header.ParentBeaconRoot = new(common.Hash)
if cm.config.Parlia != nil {
header.WithdrawalsHash = new(common.Hash)
}
}
return header
}
Expand Down
3 changes: 3 additions & 0 deletions core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ func (g *Genesis) ToBlock() *types.Block {
if head.BlobGasUsed == nil {
head.BlobGasUsed = new(uint64)
}
if conf.Parlia != nil {
head.WithdrawalsHash = new(common.Hash)
}
}
}
return types.NewBlock(head, nil, nil, nil, trie.NewStackTrie(nil)).WithWithdrawals(withdrawals)
Expand Down
Loading

0 comments on commit 6498101

Please sign in to comment.