-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: indexer indexes blocks and txs of mev-commit chain to any plugg…
…able storage
- Loading branch information
kant
committed
Jul 31, 2024
1 parent
fca7319
commit 4e2c562
Showing
12 changed files
with
1,138 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# indexer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,319 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"encoding/hex" | ||
"fmt" | ||
"log/slog" | ||
"math/big" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/core/types" | ||
|
||
"github.com/primev/mev-commit/indexer/pkg/ethclient" | ||
"github.com/primev/mev-commit/indexer/pkg/store" | ||
) | ||
|
||
type BlockchainIndexer struct { | ||
ethClient ethclient.EthereumClient | ||
storage store.Storage | ||
forwardBlockChan chan *types.Block | ||
backwardBlockChan chan *types.Block | ||
txChan chan *types.Transaction | ||
indexInterval time.Duration | ||
lastForwardIndexedBlock *big.Int | ||
lastBackwardIndexedBlock *big.Int | ||
logger *slog.Logger | ||
} | ||
|
||
func NewBlockchainIndexer(ethClient ethclient.EthereumClient, storage store.Storage, indexInterval time.Duration) *BlockchainIndexer { | ||
return &BlockchainIndexer{ | ||
ethClient: ethClient, | ||
storage: storage, | ||
forwardBlockChan: make(chan *types.Block, 100), | ||
backwardBlockChan: make(chan *types.Block, 100), | ||
txChan: make(chan *types.Transaction, 100), | ||
indexInterval: indexInterval, | ||
logger: slog.Default(), | ||
} | ||
} | ||
|
||
func (bi *BlockchainIndexer) Start(ctx context.Context) error { | ||
if err := bi.storage.CreateIndices(ctx); err != nil { | ||
return fmt.Errorf("failed to create indices: %w", err) | ||
} | ||
|
||
latestBlockNumber, err := bi.ethClient.BlockNumber(ctx) | ||
bi.logger.Info("latest block number", "block number", latestBlockNumber) | ||
if err != nil { | ||
return fmt.Errorf("failed to get latest block number: %w", err) | ||
} | ||
|
||
if err = bi.initializeForwardIndex(ctx, latestBlockNumber.Uint64()); err != nil { | ||
return err | ||
} | ||
|
||
if err = bi.initializeBackwardIndex(ctx, latestBlockNumber.Uint64()); err != nil { | ||
return err | ||
} | ||
|
||
go bi.fetchForwardBlocks(ctx) | ||
go bi.processForwardBlocks(ctx) | ||
go bi.fetchBackwardBlocks(ctx) | ||
go bi.processBackwardBlocks(ctx) | ||
|
||
// Block the main function indefinitely | ||
select {} | ||
} | ||
|
||
func (bi *BlockchainIndexer) initializeForwardIndex(ctx context.Context, latestBlockNumber uint64) error { | ||
lastForwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "forward") | ||
if err != nil { | ||
return fmt.Errorf("failed to get last forward indexed block: %w", err) | ||
} | ||
|
||
bi.logger.Info("last indexed block", "blockNumber", lastForwardIndexedBlock, "direction", "forward") | ||
|
||
if lastForwardIndexedBlock == nil || lastForwardIndexedBlock.Cmp(big.NewInt(0)) == 0 { | ||
bi.lastForwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber - 1) | ||
} else { | ||
bi.lastForwardIndexedBlock = lastForwardIndexedBlock | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (bi *BlockchainIndexer) initializeBackwardIndex(ctx context.Context, latestBlockNumber uint64) error { | ||
lastBackwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "backward") | ||
if err != nil { | ||
return fmt.Errorf("failed to get last backward indexed block: %w", err) | ||
} | ||
|
||
bi.logger.Info("last indexed block", "blockNumber", lastBackwardIndexedBlock, "direction", "backward") | ||
|
||
if lastBackwardIndexedBlock == nil || lastBackwardIndexedBlock.Cmp(big.NewInt(0)) == 0 { | ||
bi.lastBackwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber) | ||
} else { | ||
bi.lastBackwardIndexedBlock = lastBackwardIndexedBlock | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (bi *BlockchainIndexer) fetchForwardBlocks(ctx context.Context) { | ||
ticker := time.NewTicker(bi.indexInterval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
latestBlockNumber, err := bi.ethClient.BlockNumber(ctx) | ||
if err != nil { | ||
bi.logger.Error("failed to get latest block number", "error", err) | ||
continue | ||
} | ||
|
||
for blockNum := new(big.Int).Add(bi.lastForwardIndexedBlock, big.NewInt(1)); blockNum.Cmp(latestBlockNumber) <= 0; blockNum.Add(blockNum, big.NewInt(5)) { | ||
endBlockNum := new(big.Int).Add(blockNum, big.NewInt(4)) | ||
if endBlockNum.Cmp(latestBlockNumber) > 0 { | ||
endBlockNum.Set(latestBlockNumber) | ||
} | ||
|
||
blockNums := []*big.Int{} | ||
for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) <= 0; bn.Add(bn, big.NewInt(1)) { | ||
blockNums = append(blockNums, new(big.Int).Set(bn)) | ||
} | ||
|
||
blocks, err := bi.fetchBlocks(ctx, blockNums) | ||
if err != nil { | ||
bi.logger.Error("failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err) | ||
continue | ||
} | ||
|
||
for _, block := range blocks { | ||
bi.forwardBlockChan <- block | ||
bi.lastForwardIndexedBlock.Set(block.Number()) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (bi *BlockchainIndexer) fetchBackwardBlocks(ctx context.Context) { | ||
ticker := time.NewTicker(bi.indexInterval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
if bi.lastBackwardIndexedBlock.Sign() <= 0 { | ||
return | ||
} | ||
zeroBigNum := big.NewInt(0) | ||
blockNum := new(big.Int).Sub(bi.lastBackwardIndexedBlock, big.NewInt(1)) | ||
|
||
for i := 0; blockNum.Cmp(zeroBigNum) >= 0; i++ { | ||
endBlockNum := new(big.Int).Sub(blockNum, big.NewInt(4)) | ||
if endBlockNum.Cmp(zeroBigNum) < 0 { | ||
endBlockNum.Set(zeroBigNum) | ||
} | ||
|
||
blockNums := []*big.Int{} | ||
for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) >= 0; bn.Sub(bn, big.NewInt(1)) { | ||
blockNums = append(blockNums, new(big.Int).Set(bn)) | ||
} | ||
|
||
blocks, err := bi.fetchBlocks(ctx, blockNums) | ||
if err != nil { | ||
bi.logger.Error("failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err) | ||
break | ||
} | ||
|
||
for _, block := range blocks { | ||
bi.backwardBlockChan <- block | ||
bi.lastBackwardIndexedBlock.Set(block.Number()) | ||
if block.Number().Cmp(zeroBigNum) == 0 { | ||
bi.logger.Info("done fetching backward blocks...") | ||
return | ||
} | ||
} | ||
blockNum.Sub(endBlockNum, big.NewInt(1)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (bi *BlockchainIndexer) processForwardBlocks(ctx context.Context) { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case block := <-bi.forwardBlockChan: | ||
if err := bi.indexBlock(ctx, block); err != nil { | ||
bi.logger.Error("failed to index block", "error", err) | ||
} | ||
if err := bi.indexTransactions(ctx, block); err != nil { | ||
bi.logger.Error("failed to index transactions", "error", err) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (bi *BlockchainIndexer) processBackwardBlocks(ctx context.Context) { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case block := <-bi.backwardBlockChan: | ||
if err := bi.indexBlock(ctx, block); err != nil { | ||
bi.logger.Error("failed to index block", "error", err) | ||
} | ||
if err := bi.indexTransactions(ctx, block); err != nil { | ||
bi.logger.Error("failed to index transactions", "error", err) | ||
} | ||
if block.Number().Cmp(big.NewInt(0)) == 0 { | ||
bi.logger.Info("done processing backward blocks...") | ||
return | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (bi *BlockchainIndexer) indexBlock(ctx context.Context, block *types.Block) error { | ||
timestamp := time.UnixMilli(int64(block.Time())).UTC().Format("2006-01-02T15:04:05.000Z") | ||
indexBlock := &store.IndexBlock{ | ||
Number: block.NumberU64(), | ||
Hash: block.Hash().Hex(), | ||
ParentHash: block.ParentHash().Hex(), | ||
Root: block.Root().Hex(), | ||
Nonce: block.Nonce(), | ||
Timestamp: timestamp, | ||
Transactions: len(block.Transactions()), | ||
BaseFee: block.BaseFee().Uint64(), | ||
GasLimit: block.GasLimit(), | ||
GasUsed: block.GasUsed(), | ||
Difficulty: block.Difficulty().Uint64(), | ||
ExtraData: hex.EncodeToString(block.Extra()), | ||
} | ||
|
||
return bi.storage.IndexBlock(ctx, indexBlock) | ||
} | ||
|
||
func (bi *BlockchainIndexer) indexTransactions(ctx context.Context, block *types.Block) error { | ||
var transactions []*store.IndexTransaction | ||
var txHashes []string | ||
|
||
for _, tx := range block.Transactions() { | ||
from, err := types.Sender(types.NewCancunSigner(tx.ChainId()), tx) | ||
if err != nil { | ||
return fmt.Errorf("failed to derive sender: %w", err) | ||
} | ||
|
||
v, r, s := tx.RawSignatureValues() | ||
timestamp := tx.Time().UTC().Format("2006-01-02T15:04:05.000Z") | ||
transaction := &store.IndexTransaction{ | ||
Hash: tx.Hash().Hex(), | ||
From: from.Hex(), | ||
Gas: tx.Gas(), | ||
Nonce: tx.Nonce(), | ||
BlockHash: block.Hash().Hex(), | ||
BlockNumber: block.NumberU64(), | ||
ChainId: tx.ChainId().String(), | ||
V: v.String(), | ||
R: r.String(), | ||
S: s.String(), | ||
Input: hex.EncodeToString(tx.Data()), | ||
Timestamp: timestamp, | ||
} | ||
|
||
if tx.To() != nil { | ||
transaction.To = tx.To().Hex() | ||
} | ||
if tx.GasPrice() != nil { | ||
transaction.GasPrice = tx.GasPrice().Uint64() | ||
} | ||
if tx.GasTipCap() != nil { | ||
transaction.GasTipCap = tx.GasTipCap().Uint64() | ||
} | ||
if tx.GasFeeCap() != nil { | ||
transaction.GasFeeCap = tx.GasFeeCap().Uint64() | ||
} | ||
if tx.Value() != nil { | ||
transaction.Value = tx.Value().String() | ||
} | ||
|
||
transactions = append(transactions, transaction) | ||
txHashes = append(txHashes, tx.Hash().Hex()) | ||
} | ||
|
||
receipts, err := bi.fetchReceipts(ctx, txHashes) | ||
if err != nil { | ||
return fmt.Errorf("failed to fetch transaction receipts: %w", err) | ||
} | ||
|
||
for _, tx := range transactions { | ||
if receipt, ok := receipts[tx.Hash]; ok { | ||
tx.Status = receipt.Status | ||
tx.GasUsed = receipt.GasUsed | ||
tx.CumulativeGasUsed = receipt.CumulativeGasUsed | ||
tx.ContractAddress = receipt.ContractAddress.Hex() | ||
tx.TransactionIndex = receipt.TransactionIndex | ||
tx.ReceiptBlockHash = receipt.BlockHash.Hex() | ||
tx.ReceiptBlockNumber = receipt.BlockNumber.Uint64() | ||
} | ||
} | ||
|
||
return bi.storage.IndexTransactions(ctx, transactions) | ||
} | ||
|
||
func (bi *BlockchainIndexer) fetchReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) { | ||
return bi.ethClient.TxReceipts(ctx, txHashes) | ||
} | ||
|
||
func (bi *BlockchainIndexer) fetchBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) { | ||
return bi.ethClient.GetBlocks(ctx, blockNums) | ||
} |
Oops, something went wrong.