Skip to content

Commit

Permalink
fix(eth): move EthSendRawTransactionUntrusted off main EthModuleAPI
Browse files Browse the repository at this point in the history
Move it to EthAPI so the GatewayAPI doesn't have to implement it.
  • Loading branch information
rvagg committed Sep 5, 2024
1 parent f4c8523 commit f56b61d
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 42 deletions.
1 change: 1 addition & 0 deletions gateway/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ var (
_ full.GasModuleAPI = (*Node)(nil)
_ full.MpoolModuleAPI = (*Node)(nil)
_ full.StateModuleAPI = (*Node)(nil)
_ full.EthModuleAPI = (*Node)(nil)
)

type options struct {
Expand Down
3 changes: 3 additions & 0 deletions node/builder_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ var ChainNode = Options(
Override(new(full.StateModuleAPI), From(new(api.Gateway))),
Override(new(stmgr.StateManagerAPI), rpcstmgr.NewRPCStateManager),
Override(new(full.EthModuleAPI), From(new(api.Gateway))),
Override(new(full.EthTxHashManager), &full.EthTxHashManagerDummy{}),
Override(new(full.EthEventAPI), From(new(api.Gateway))),
Override(new(full.ActorEventAPI), From(new(api.Gateway))),
),
Expand Down Expand Up @@ -261,12 +262,14 @@ func ConfigFullNode(c interface{}) Option {

If(cfg.Fevm.EnableEthRPC,
Override(new(*full.EthEventHandler), modules.EthEventHandler(cfg.Events, cfg.Fevm.EnableEthRPC)),
Override(new(full.EthTxHashManager), modules.EthTxHashManager(cfg.Fevm)),
Override(new(full.EthModuleAPI), modules.EthModuleAPI(cfg.Fevm)),
Override(new(full.EthEventAPI), From(new(*full.EthEventHandler))),
),
If(!cfg.Fevm.EnableEthRPC,
Override(new(full.EthModuleAPI), &full.EthModuleDummy{}),
Override(new(full.EthEventAPI), &full.EthModuleDummy{}),
Override(new(full.EthTxHashManager), &full.EthTxHashManagerDummy{}),
),

If(cfg.Events.EnableActorEventsAPI,
Expand Down
4 changes: 0 additions & 4 deletions node/impl/full/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ func (e *EthModuleDummy) EthSendRawTransaction(ctx context.Context, rawTx ethtyp
return ethtypes.EthHash{}, ErrModuleDisabled
}

func (e *EthModuleDummy) EthSendRawTransactionUntrusted(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error) {
return ethtypes.EthHash{}, ErrModuleDisabled
}

func (e *EthModuleDummy) Web3ClientVersion(ctx context.Context) (string, error) {
return "", ErrModuleDisabled
}
Expand Down
26 changes: 14 additions & 12 deletions node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ type EthModuleAPI interface {
EthCall(ctx context.Context, tx ethtypes.EthCall, blkParam ethtypes.EthBlockNumberOrHash) (ethtypes.EthBytes, error)
EthMaxPriorityFeePerGas(ctx context.Context) (ethtypes.EthBigInt, error)
EthSendRawTransaction(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error)
EthSendRawTransactionUntrusted(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error)
Web3ClientVersion(ctx context.Context) (string, error)
EthTraceBlock(ctx context.Context, blkNum string) ([]*ethtypes.EthTraceBlock, error)
EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error)
Expand All @@ -102,6 +101,7 @@ type EthEventAPI interface {
var (
_ EthModuleAPI = *new(api.FullNode)
_ EthEventAPI = *new(api.FullNode)
_ EthModuleAPI = *new(api.Gateway)
)

// EthModule provides the default implementation of the standard Ethereum JSON-RPC API.
Expand Down Expand Up @@ -135,7 +135,7 @@ type EthModule struct {
Chain *store.ChainStore
Mpool *messagepool.MessagePool
StateManager *stmgr.StateManager
EthTxHashManager *EthTxHashManager
EthTxHashManager EthTxHashManager
EthTraceFilterMaxResults uint64
EthEventHandler *EthEventHandler

Expand Down Expand Up @@ -169,8 +169,10 @@ type EthAPI struct {
Chain *store.ChainStore
StateManager *stmgr.StateManager

EthTxHashManager
EthModuleAPI
EthEventAPI
MpoolAPI
}

var ErrNullRound = errors.New("requested epoch was a null round")
Expand Down Expand Up @@ -355,7 +357,7 @@ func (a *EthModule) EthGetTransactionByHashLimited(ctx context.Context, txHash *
return nil, nil
}

c, err := a.EthTxHashManager.TransactionHashLookup.GetCidFromHash(*txHash)
c, err := a.EthTxHashManager.GetCidFromHash(*txHash)
if err != nil {
log.Debug("could not find transaction hash %s in lookup table", txHash.String())
}
Expand Down Expand Up @@ -414,7 +416,7 @@ func (a *EthModule) EthGetMessageCidByTransactionHash(ctx context.Context, txHas
return nil, nil
}

c, err := a.EthTxHashManager.TransactionHashLookup.GetCidFromHash(*txHash)
c, err := a.EthTxHashManager.GetCidFromHash(*txHash)
// We fall out of the first condition and continue
if errors.Is(err, ethhashlookup.ErrNotFound) {
log.Debug("could not find transaction hash %s in lookup table", txHash.String())
Expand Down Expand Up @@ -498,7 +500,7 @@ func (a *EthModule) EthGetTransactionReceipt(ctx context.Context, txHash ethtype
}

func (a *EthModule) EthGetTransactionReceiptLimited(ctx context.Context, txHash ethtypes.EthHash, limit abi.ChainEpoch) (*api.EthTxReceipt, error) {
c, err := a.EthTxHashManager.TransactionHashLookup.GetCidFromHash(txHash)
c, err := a.EthTxHashManager.GetCidFromHash(txHash)
if err != nil {
log.Debug("could not find transaction hash %s in lookup table", txHash.String())
}
Expand Down Expand Up @@ -917,14 +919,14 @@ func (a *EthModule) EthGasPrice(ctx context.Context) (ethtypes.EthBigInt, error)
}

func (a *EthModule) EthSendRawTransaction(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error) {
return a.ethSendRawTransaction(ctx, rawTx, false)
return ethSendRawTransaction(ctx, a.MpoolAPI, a.EthTxHashManager, rawTx, false)
}

func (a *EthModule) EthSendRawTransactionUntrusted(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error) {
return a.ethSendRawTransaction(ctx, rawTx, true)
func (a *EthAPI) EthSendRawTransactionUntrusted(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error) {
return ethSendRawTransaction(ctx, a.MpoolAPI, a.EthTxHashManager, rawTx, true)
}

func (a *EthModule) ethSendRawTransaction(ctx context.Context, rawTx ethtypes.EthBytes, untrusted bool) (ethtypes.EthHash, error) {
func ethSendRawTransaction(ctx context.Context, mpool MpoolAPI, ethTxHashManager EthTxHashManager, rawTx ethtypes.EthBytes, untrusted bool) (ethtypes.EthHash, error) {
txArgs, err := ethtypes.ParseEthTransaction(rawTx)
if err != nil {
return ethtypes.EmptyEthHash, err
Expand All @@ -941,18 +943,18 @@ func (a *EthModule) ethSendRawTransaction(ctx context.Context, rawTx ethtypes.Et
}

if untrusted {
if _, err = a.MpoolAPI.MpoolPushUntrusted(ctx, smsg); err != nil {
if _, err = mpool.MpoolPushUntrusted(ctx, smsg); err != nil {
return ethtypes.EmptyEthHash, err
}
} else {
if _, err = a.MpoolAPI.MpoolPush(ctx, smsg); err != nil {
if _, err = mpool.MpoolPush(ctx, smsg); err != nil {
return ethtypes.EmptyEthHash, err
}
}

// make it immediately available in the transaction hash lookup db, even though it will also
// eventually get there via the mpool
if err := a.EthTxHashManager.TransactionHashLookup.UpsertHash(txHash, smsg.Cid()); err != nil {
if err := ethTxHashManager.UpsertHash(txHash, smsg.Cid()); err != nil {
log.Errorf("error inserting tx mapping to db: %s", err)
}

Expand Down
96 changes: 80 additions & 16 deletions node/impl/full/txhashmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,56 @@ import (

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/ipfs/go-cid"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/chain/ethhashlookup"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
)

type EthTxHashManager struct {
StateAPI StateAPI
TransactionHashLookup *ethhashlookup.EthTxHashLookup
type EthTxHashManager interface {
events.TipSetObserver

PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error
ProcessSignedMessage(ctx context.Context, msg *types.SignedMessage)
UpsertHash(txHash ethtypes.EthHash, c cid.Cid) error
GetCidFromHash(txHash ethtypes.EthHash) (cid.Cid, error)
DeleteEntriesOlderThan(days int) (int64, error)
}

var (
_ EthTxHashManager = (*ethTxHashManager)(nil)
_ EthTxHashManager = (*EthTxHashManagerDummy)(nil)
)

type ethTxHashManager struct {
stateAPI StateAPI
transactionHashLookup *ethhashlookup.EthTxHashLookup
}

func NewEthTxHashManager(stateAPI StateAPI, transactionHashLookup *ethhashlookup.EthTxHashLookup) EthTxHashManager {
return &ethTxHashManager{
stateAPI: stateAPI,
transactionHashLookup: transactionHashLookup,
}
}

func (m *EthTxHashManager) Revert(ctx context.Context, from, to *types.TipSet) error {
func (m *ethTxHashManager) Revert(ctx context.Context, from, to *types.TipSet) error {
return nil
}

func (m *EthTxHashManager) PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error {
func (m *ethTxHashManager) PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error {
if minHeight < buildconstants.UpgradeHyggeHeight {
minHeight = buildconstants.UpgradeHyggeHeight
}

ts := m.StateAPI.Chain.GetHeaviestTipSet()
ts := m.stateAPI.Chain.GetHeaviestTipSet()
for ts.Height() > minHeight {
for _, block := range ts.Blocks() {
msgs, err := m.StateAPI.Chain.SecpkMessagesForBlock(ctx, block)
msgs, err := m.stateAPI.Chain.SecpkMessagesForBlock(ctx, block)
if err != nil {
// If we can't find the messages, we've either imported from snapshot or pruned the store
log.Debug("exiting message mapping population at epoch ", ts.Height())
Expand All @@ -44,7 +68,7 @@ func (m *EthTxHashManager) PopulateExistingMappings(ctx context.Context, minHeig
}

var err error
ts, err = m.StateAPI.Chain.GetTipSetFromKey(ctx, ts.Parents())
ts, err = m.stateAPI.Chain.GetTipSetFromKey(ctx, ts.Parents())
if err != nil {
return err
}
Expand All @@ -53,9 +77,9 @@ func (m *EthTxHashManager) PopulateExistingMappings(ctx context.Context, minHeig
return nil
}

func (m *EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) error {
func (m *ethTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) error {
for _, blk := range to.Blocks() {
_, smsgs, err := m.StateAPI.Chain.MessagesForBlock(ctx, blk)
_, smsgs, err := m.stateAPI.Chain.MessagesForBlock(ctx, blk)
if err != nil {
return err
}
Expand All @@ -70,7 +94,7 @@ func (m *EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) er
return err
}

err = m.TransactionHashLookup.UpsertHash(hash, smsg.Cid())
err = m.transactionHashLookup.UpsertHash(hash, smsg.Cid())
if err != nil {
return err
}
Expand All @@ -80,7 +104,19 @@ func (m *EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) er
return nil
}

func (m *EthTxHashManager) ProcessSignedMessage(ctx context.Context, msg *types.SignedMessage) {
func (m *ethTxHashManager) UpsertHash(txHash ethtypes.EthHash, c cid.Cid) error {
return m.transactionHashLookup.UpsertHash(txHash, c)
}

func (m *ethTxHashManager) GetCidFromHash(txHash ethtypes.EthHash) (cid.Cid, error) {
return m.transactionHashLookup.GetCidFromHash(txHash)
}

func (m *ethTxHashManager) DeleteEntriesOlderThan(days int) (int64, error) {
return m.transactionHashLookup.DeleteEntriesOlderThan(days)
}

func (m *ethTxHashManager) ProcessSignedMessage(ctx context.Context, msg *types.SignedMessage) {
if msg.Signature.Type != crypto.SigTypeDelegated {
return
}
Expand All @@ -97,14 +133,14 @@ func (m *EthTxHashManager) ProcessSignedMessage(ctx context.Context, msg *types.
return
}

err = m.TransactionHashLookup.UpsertHash(txHash, msg.Cid())
err = m.UpsertHash(txHash, msg.Cid())
if err != nil {
log.Errorf("error inserting tx mapping to db: %s", err)
return
}
}

func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager *EthTxHashManager) {
func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager EthTxHashManager) {
for {
select {
case <-ctx.Done():
Expand All @@ -119,18 +155,46 @@ func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager
}
}

func EthTxHashGC(ctx context.Context, retentionDays int, manager *EthTxHashManager) {
func EthTxHashGC(ctx context.Context, retentionDays int, manager EthTxHashManager) {
if retentionDays == 0 {
return
}

gcPeriod := 1 * time.Hour
for {
entriesDeleted, err := manager.TransactionHashLookup.DeleteEntriesOlderThan(retentionDays)
entriesDeleted, err := manager.DeleteEntriesOlderThan(retentionDays)
if err != nil {
log.Errorf("error garbage collecting eth transaction hash database: %s", err)
}
log.Info("garbage collection run on eth transaction hash lookup database. %d entries deleted", entriesDeleted)
time.Sleep(gcPeriod)
}
}

type EthTxHashManagerDummy struct{}

func (d *EthTxHashManagerDummy) PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error {
return nil
}

func (d *EthTxHashManagerDummy) Revert(ctx context.Context, from, to *types.TipSet) error {
return nil
}

func (d *EthTxHashManagerDummy) Apply(ctx context.Context, from, to *types.TipSet) error {
return nil
}

func (d *EthTxHashManagerDummy) ProcessSignedMessage(ctx context.Context, msg *types.SignedMessage) {}

func (d *EthTxHashManagerDummy) UpsertHash(txHash ethtypes.EthHash, c cid.Cid) error {
return nil
}

func (d *EthTxHashManagerDummy) GetCidFromHash(txHash ethtypes.EthHash) (cid.Cid, error) {
return cid.Undef, nil
}

func (d *EthTxHashManagerDummy) DeleteEntriesOlderThan(days int) (int64, error) {
return 0, nil
}
25 changes: 15 additions & 10 deletions node/modules/ethmodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/filecoin-project/lotus/node/repo"
)

func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI, *full.EthEventHandler) (*full.EthModule, error) {
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI, mpoolapi full.MpoolAPI, syncapi full.SyncAPI, ethEventHandler *full.EthEventHandler) (*full.EthModule, error) {
func EthTxHashManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.SyncAPI) (full.EthTxHashManager, error) {
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, syncapi full.SyncAPI) (full.EthTxHashManager, error) {
ctx := helpers.LifecycleCtx(mctx, lc)

sqlitePath, err := r.SqlitePath()
Expand All @@ -51,10 +51,7 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep
},
})

ethTxHashManager := full.EthTxHashManager{
StateAPI: stateapi,
TransactionHashLookup: transactionHashLookup,
}
ethTxHashManager := full.NewEthTxHashManager(stateapi, transactionHashLookup)

if !dbAlreadyExists {
err = ethTxHashManager.PopulateExistingMappings(mctx, 0)
Expand Down Expand Up @@ -82,21 +79,29 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep
}

// Tipset listener
_ = ev.Observe(&ethTxHashManager)
_ = ev.Observe(ethTxHashManager)

ch, err := mp.Updates(ctx)
if err != nil {
return err
}
go full.WaitForMpoolUpdates(ctx, ch, &ethTxHashManager)
go full.EthTxHashGC(ctx, cfg.EthTxHashMappingLifetimeDays, &ethTxHashManager)
go full.WaitForMpoolUpdates(ctx, ch, ethTxHashManager)
go full.EthTxHashGC(ctx, cfg.EthTxHashMappingLifetimeDays, ethTxHashManager)

return nil
},
})

return ethTxHashManager, nil
}
}

func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI, *full.EthEventHandler, full.EthTxHashManager) (*full.EthModule, error) {
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI, mpoolapi full.MpoolAPI, syncapi full.SyncAPI, ethEventHandler *full.EthEventHandler, ethTxHashManager full.EthTxHashManager) (*full.EthModule, error) {

var blkCache *arc.ARCCache[cid.Cid, *ethtypes.EthBlock]
var blkTxCache *arc.ARCCache[cid.Cid, *ethtypes.EthBlock]
var err error
if cfg.EthBlkCacheSize > 0 {
blkCache, err = arc.NewARC[cid.Cid, *ethtypes.EthBlock](cfg.EthBlkCacheSize)
if err != nil {
Expand All @@ -120,7 +125,7 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep
SyncAPI: syncapi,
EthEventHandler: ethEventHandler,

EthTxHashManager: &ethTxHashManager,
EthTxHashManager: ethTxHashManager,
EthTraceFilterMaxResults: cfg.EthTraceFilterMaxResults,

EthBlkCache: blkCache,
Expand Down

0 comments on commit f56b61d

Please sign in to comment.