Skip to content

Commit

Permalink
init bundle fetcher (ethereum#24)
Browse files Browse the repository at this point in the history
* high prio and low prio bundle fetcher
  • Loading branch information
bhakiyakalimuthu authored and avalonche committed Mar 9, 2023
1 parent 440f516 commit 5fe5d51
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 26 deletions.
1 change: 0 additions & 1 deletion builder/block_submission_rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,4 @@ func TestLimit(t *testing.T) {
t.Error("chan was not ready")
}
}

}
12 changes: 6 additions & 6 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/flashbotsextra"
"github.com/ethereum/go-ethereum/log"

"github.com/flashbots/go-boost-utils/bls"
Expand Down Expand Up @@ -43,23 +44,22 @@ type IBuilder interface {
}

type Builder struct {
ds IDatabaseService
ds flashbotsextra.IDatabaseService
beaconClient IBeaconClient
relay IRelay
eth IEthereumService
resubmitter Resubmitter
blockSubmissionRateLimiter *BlockSubmissionRateLimiter

builderSecretKey *bls.SecretKey
builderPublicKey boostTypes.PublicKey
builderSigningDomain boostTypes.Domain
builderSecretKey *bls.SecretKey
builderPublicKey boostTypes.PublicKey
builderSigningDomain boostTypes.Domain

bestMu sync.Mutex
bestAttrs BuilderPayloadAttributes
bestBlockProfit *big.Int
}

func NewBuilder(sk *bls.SecretKey, ds IDatabaseService, bc IBeaconClient, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder {
func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, bc IBeaconClient, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder {
pkBytes := bls.PublicKeyFromSecretKey(sk).Compress()
pk := boostTypes.PublicKey{}
pk.FromSlice(pkBytes)
Expand Down
3 changes: 2 additions & 1 deletion builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/flashbotsextra"
"github.com/flashbots/go-boost-utils/bls"
boostTypes "github.com/flashbots/go-boost-utils/types"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -74,7 +75,7 @@ func TestOnPayloadAttributes(t *testing.T) {

testEthService := &testEthereumService{synced: true, testExecutableData: testExecutableData, testBlock: testBlock}

builder := NewBuilder(sk, NilDbService{}, &testBeacon, &testRelay, bDomain, testEthService)
builder := NewBuilder(sk, flashbotsextra.NilDbService{}, &testBeacon, &testRelay, bDomain, testEthService)
builder.Start()
defer builder.Stop()

Expand Down
3 changes: 2 additions & 1 deletion builder/local_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/flashbotsextra"
"github.com/ethereum/go-ethereum/log"
"github.com/flashbots/go-boost-utils/bls"
boostTypes "github.com/flashbots/go-boost-utils/types"
Expand All @@ -29,7 +30,7 @@ func newTestBackend(t *testing.T, forkchoiceData *beacon.ExecutableDataV1, block
beaconClient := &testBeaconClient{validator: validator}
localRelay := NewLocalRelay(sk, beaconClient, bDomain, cDomain, ForkData{}, true)
ethService := &testEthereumService{synced: true, testExecutableData: forkchoiceData, testBlock: block}
backend := NewBuilder(sk, NilDbService{}, beaconClient, localRelay, bDomain, ethService)
backend := NewBuilder(sk, flashbotsextra.NilDbService{}, beaconClient, localRelay, bDomain, ethService)
// service := NewService("127.0.0.1:31545", backend)

return backend, localRelay, validator
Expand Down
1 change: 0 additions & 1 deletion builder/resubmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
)

func TestResubmitter(t *testing.T) {

resubmitter := Resubmitter{}

pingCh := make(chan error)
Expand Down
27 changes: 20 additions & 7 deletions builder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/flashbotsextra"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
Expand Down Expand Up @@ -170,16 +172,27 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *BuilderConfig) error
return errors.New("neither local nor remote relay specified")
}

ethereumService := NewEthereumService(backend)

// TODO: move to proper flags
var ds IDatabaseService
ds, err = NewDatabaseService(os.Getenv("FLASHBOTS_POSTGRES_DSN"))
if err != nil {
log.Error("could not connect to the DB", "err", err)
ds = NilDbService{}
var ds flashbotsextra.IDatabaseService
dbDSN := os.Getenv("FLASHBOTS_POSTGRES_DSN")
if dbDSN != "" {
ds, err = flashbotsextra.NewDatabaseService(dbDSN)
if err != nil {
log.Error("could not connect to the DB", "err", err)
ds = flashbotsextra.NilDbService{}
}
} else {
log.Info("db dsn is not provided, starting nil db svc")
ds = flashbotsextra.NilDbService{}
}

// Bundle fetcher
mevBundleCh := make(chan []types.MevBundle)
blockNumCh := make(chan int64)
bundleFetcher := flashbotsextra.NewBundleFetcher(backend, ds, blockNumCh, mevBundleCh, true)
go bundleFetcher.Run()

ethereumService := NewEthereumService(backend)
builderBackend := NewBuilder(builderSk, ds, beaconClient, relay, builderSigningDomain, ethereumService)
builderService := NewService(cfg.ListenAddr, localRelay, builderBackend)

Expand Down
9 changes: 9 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,15 @@ func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) ([]t
return ret, nil
}

// AddMevBundles adds a mev bundles to the pool
func (pool *TxPool) AddMevBundles(mevBundles []types.MevBundle) error {
pool.mu.Lock()
defer pool.mu.Unlock()

pool.mevBundles = append(pool.mevBundles, mevBundles...)
return nil
}

// AddMevBundle adds a mev bundle to the pool
func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
bundleHasher := sha3.NewLegacyKeccak256()
Expand Down
37 changes: 37 additions & 0 deletions flashbotsextra/cmd/bundle_fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"os"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/flashbotsextra"
"github.com/ethereum/go-ethereum/log"
)

func main() {
// Test bundle fetcher
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
mevBundleCh := make(chan []types.MevBundle)
blockNumCh := make(chan int64)
db, err := flashbotsextra.NewDatabaseService("postgres://postgres:postgres@localhost:5432/test?sslmode=disable")
if err != nil {
panic(err)
}
bundleFetcher := flashbotsextra.NewBundleFetcher(nil, db, blockNumCh, mevBundleCh, false)

go bundleFetcher.Run()
log.Info("waiting for mev bundles")
go func() {
blockNum := []int64{15232009, 15232008, 15232010}
for _, num := range blockNum {
<-time.After(time.Second)
blockNumCh <- num
}
}()
for bundles := range mevBundleCh {
for _, bundle := range bundles {
log.Info("bundle info", "blockNum", bundle.BlockNumber, "txsLength", len(bundle.Txs))
}
}
}
42 changes: 41 additions & 1 deletion builder/database.go → flashbotsextra/database.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package builder
package flashbotsextra

import (
"context"
Expand All @@ -13,21 +13,32 @@ import (
_ "github.com/lib/pq"
)

const (
highPrioLimitSize = 500
lowPrioLimitSize = 100
)

type IDatabaseService interface {
ConsumeBuiltBlock(block *types.Block, bundles []types.SimulatedBundle, bidTrace *boostTypes.BidTrace)
GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error)
}

type NilDbService struct{}

func (NilDbService) ConsumeBuiltBlock(*types.Block, []types.SimulatedBundle, *boostTypes.BidTrace) {}

func (NilDbService) GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) {
return []DbBundle{}, nil
}

type DatabaseService struct {
db *sqlx.DB

insertBuiltBlockStmt *sqlx.NamedStmt
insertBlockBuiltBundleNoIdStmt *sqlx.NamedStmt
insertBlockBuiltBundleWithIdStmt *sqlx.NamedStmt
insertMissingBundleStmt *sqlx.NamedStmt
fetchPrioBundlesStmt *sqlx.NamedStmt
}

func NewDatabaseService(postgresDSN string) (*DatabaseService, error) {
Expand Down Expand Up @@ -56,17 +67,26 @@ func NewDatabaseService(postgresDSN string) (*DatabaseService, error) {
return nil, err
}

fetchPrioBundlesStmt, err := db.PrepareNamed("select bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase from bundles where is_high_prio = :is_high_prio and coinbase_diff*1e18/total_gas_used > 1000000000 and param_block_number = :param_block_number order by coinbase_diff/total_gas_used DESC limit :limit")
if err != nil {
return nil, err
}
return &DatabaseService{
db: db,
insertBuiltBlockStmt: insertBuiltBlockStmt,
insertBlockBuiltBundleNoIdStmt: insertBlockBuiltBundleNoIdStmt,
insertBlockBuiltBundleWithIdStmt: insertBlockBuiltBundleWithIdStmt,
insertMissingBundleStmt: insertMissingBundleStmt,
fetchPrioBundlesStmt: fetchPrioBundlesStmt,
}, nil
}

func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, bundles []types.SimulatedBundle, bidTrace *boostTypes.BidTrace) {
tx, err := ds.db.Beginx()
if err != nil {
log.Error("could not insert built block", "err", err)
return
}

blockData := BuiltBlock{
BlockNumber: block.NumberU64(),
Expand Down Expand Up @@ -134,3 +154,23 @@ func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, bundles []types
log.Error("could not commit DB trasnaction", "err", err)
}
}
func (ds *DatabaseService) GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) {
var bundles []DbBundle
tx, err := ds.db.Beginx()
if err != nil {
log.Error("failed to begin db tx for get priority bundles", "err", err)
return nil, err
}
arg := map[string]interface{}{"param_block_number": uint64(blockNum), "is_high_prio": isHighPrio, "limit": lowPrioLimitSize}
if isHighPrio {
arg["limit"] = highPrioLimitSize
}
if err = tx.NamedStmtContext(ctx, ds.fetchPrioBundlesStmt).SelectContext(ctx, &bundles, arg); err != nil {
return nil, err
}
err = tx.Commit()
if err != nil {
log.Error("could not commit GetPriorityBundles transaction", "err", err)
}
return bundles, nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package builder
package flashbotsextra

import (
"math/big"
Expand Down
12 changes: 6 additions & 6 deletions builder/database_types.go → flashbotsextra/database_types.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package builder
package flashbotsextra

import (
"math/big"
Expand Down Expand Up @@ -38,9 +38,9 @@ type DbBundle struct {

ParamSignedTxs string `db:"param_signed_txs"`
ParamBlockNumber uint64 `db:"param_block_number"`
ParamTimestamp uint64 `db:"param_timestamp"`
ParamTimestamp *uint64 `db:"param_timestamp"`
ReceivedTimestamp time.Time `db:"received_timestamp"`
ParamRevertingTxHashes string `db:"param_reverting_tx_hashes"`
ParamRevertingTxHashes *string `db:"param_reverting_tx_hashes"`

CoinbaseDiff string `db:"coinbase_diff"`
TotalGasUsed uint64 `db:"total_gas_used"`
Expand All @@ -54,7 +54,7 @@ func SimulatedBundleToDbBundle(bundle *types.SimulatedBundle) DbBundle {
for i, rTxHash := range bundle.OriginalBundle.RevertingTxHashes {
revertingTxHashes[i] = rTxHash.String()
}

paramRevertingTxHashes := strings.Join(revertingTxHashes, ",")
signedTxsStrings := make([]string, len(bundle.OriginalBundle.Txs))
for i, tx := range bundle.OriginalBundle.Txs {
signedTxsStrings[i] = tx.Hash().String()
Expand All @@ -65,8 +65,8 @@ func SimulatedBundleToDbBundle(bundle *types.SimulatedBundle) DbBundle {

ParamSignedTxs: strings.Join(signedTxsStrings, ","),
ParamBlockNumber: bundle.OriginalBundle.BlockNumber.Uint64(),
ParamTimestamp: bundle.OriginalBundle.MinTimestamp,
ParamRevertingTxHashes: strings.Join(revertingTxHashes, ","),
ParamTimestamp: &bundle.OriginalBundle.MinTimestamp,
ParamRevertingTxHashes: &paramRevertingTxHashes,

CoinbaseDiff: new(big.Rat).SetFrac(bundle.TotalEth, big.NewInt(1e18)).FloatString(18),
TotalGasUsed: bundle.TotalGasUsed,
Expand Down
Loading

0 comments on commit 5fe5d51

Please sign in to comment.