diff --git a/chain/ethhashlookup/eth_transaction_hash_lookup.go b/chain/ethhashlookup/eth_transaction_hash_lookup.go index 2a34e37aa03..4d974d0d9fb 100644 --- a/chain/ethhashlookup/eth_transaction_hash_lookup.go +++ b/chain/ethhashlookup/eth_transaction_hash_lookup.go @@ -16,7 +16,10 @@ import ( const DefaultDbFilename = "txhash.db" -var ErrNotFound = errors.New("not found") +var ( + ErrNotFound = errors.New("not found") + ErrAlreadyIndexed = errors.New("already indexed") +) var ddls = []string{ `CREATE TABLE IF NOT EXISTS eth_tx_hashes ( @@ -29,19 +32,23 @@ var ddls = []string{ } const ( - insertTxHash = `INSERT INTO eth_tx_hashes (hash, cid) VALUES(?, ?) ON CONFLICT (hash) DO UPDATE SET insertion_time = CURRENT_TIMESTAMP` - getCidFromHash = `SELECT cid FROM eth_tx_hashes WHERE hash = ?` - getHashFromCid = `SELECT hash FROM eth_tx_hashes WHERE cid = ?` - deleteOlderThan = `DELETE FROM eth_tx_hashes WHERE insertion_time < datetime('now', ?);` + insertTxHash = `INSERT INTO eth_tx_hashes (hash, cid) VALUES(?, ?) ON CONFLICT (hash) DO UPDATE SET insertion_time = CURRENT_TIMESTAMP` + insertUniqueTxHash = `INSERT INTO eth_tx_hashes (hash, cid) VALUES(?, ?) ON CONFLICT (hash) DO NOTHING` + getCidFromHash = `SELECT cid FROM eth_tx_hashes WHERE hash = ?` + getHashFromCid = `SELECT hash FROM eth_tx_hashes WHERE cid = ?` + deleteOlderThan = `DELETE FROM eth_tx_hashes WHERE insertion_time < datetime('now', ?);` + getLastTxHash = `SELECT hash, cid FROM eth_tx_hashes ORDER BY insertion_time DESC LIMIT 1` ) type EthTxHashLookup struct { db *sql.DB - stmtInsertTxHash *sql.Stmt - stmtGetCidFromHash *sql.Stmt - stmtGetHashFromCid *sql.Stmt - stmtDeleteOlderThan *sql.Stmt + stmtInsertTxHash *sql.Stmt + stmtInsertUniqueTxHash *sql.Stmt + stmtGetCidFromHash *sql.Stmt + stmtGetHashFromCid *sql.Stmt + stmtDeleteOlderThan *sql.Stmt + stmtGetLastTxHash *sql.Stmt } func NewTransactionHashLookup(ctx context.Context, path string) (*EthTxHashLookup, error) { @@ -68,23 +75,63 @@ func NewTransactionHashLookup(ctx context.Context, path string) (*EthTxHashLooku func (ei *EthTxHashLookup) initStatements() (err error) { ei.stmtInsertTxHash, err = ei.db.Prepare(insertTxHash) if err != nil { - return xerrors.Errorf("prepare stmtInsertTxHash: %w", err) + return xerrors.Errorf("failed to prepare stmtInsertTxHash: %w", err) } + + ei.stmtInsertUniqueTxHash, err = ei.db.Prepare(insertUniqueTxHash) + if err != nil { + return xerrors.Errorf("failed to prepare stmtInsertUniqueTxHash: %w", err) + } + ei.stmtGetCidFromHash, err = ei.db.Prepare(getCidFromHash) if err != nil { - return xerrors.Errorf("prepare stmtGetCidFromHash: %w", err) + return xerrors.Errorf("failed to prepare stmtGetCidFromHash: %w", err) } + ei.stmtGetHashFromCid, err = ei.db.Prepare(getHashFromCid) if err != nil { - return xerrors.Errorf("prepare stmtGetHashFromCid: %w", err) + return xerrors.Errorf("failed to prepare stmtGetHashFromCid: %w", err) } + ei.stmtDeleteOlderThan, err = ei.db.Prepare(deleteOlderThan) if err != nil { - return xerrors.Errorf("prepare stmtDeleteOlderThan: %w", err) + return xerrors.Errorf("failed to prepare stmtDeleteOlderThan: %w", err) } + + ei.stmtGetLastTxHash, err = ei.db.Prepare(getLastTxHash) + if err != nil { + return xerrors.Errorf("failed to prepare stmtGetMostRecentTxHash: %w", err) + } + return nil } +// UpsertUniqueHash inserts a new transaction hash and cid into the database, but only if the transaction hash does not already exist. +func (ei *EthTxHashLookup) UpsertUniqueHash(txHash ethtypes.EthHash, c cid.Cid) error { + if ei.db == nil { + return xerrors.New("db closed") + } + + result, err := ei.stmtInsertUniqueTxHash.Exec(txHash.String(), c.String()) + if err != nil { + return err + } + + affected, err := result.RowsAffected() + if err != nil { + return err + } + + if affected == 0 { + // The row already existed, so no insertion occurred + return ErrAlreadyIndexed + } + + return err +} + +// UpsertHash inserts a new transaction hash and cid into the database, +// and updates the insertion time if the transaction hash already exists. func (ei *EthTxHashLookup) UpsertHash(txHash ethtypes.EthHash, c cid.Cid) error { if ei.db == nil { return xerrors.New("db closed") @@ -128,6 +175,22 @@ func (ei *EthTxHashLookup) GetHashFromCid(c cid.Cid) (ethtypes.EthHash, error) { return ethtypes.ParseEthHash(hashString) } +// GetLastTransaction returns the most recent transaction hash and cid from the database based on insertion_time +func (ei *EthTxHashLookup) GetLastTransaction() (string, cid.Cid, error) { + if ei.db == nil { + return "", cid.Undef, xerrors.New("db closed") + } + + var hashStr, cidStr string + err := ei.stmtGetLastTxHash.QueryRow().Scan(&hashStr, &cidStr) + if err != nil { + return "", cid.Undef, err + } + + c, err := cid.Decode(cidStr) + return hashStr, c, err +} + func (ei *EthTxHashLookup) DeleteEntriesOlderThan(days int) (int64, error) { if ei.db == nil { return 0, xerrors.New("db closed") diff --git a/chain/ethhashlookup/index.go b/chain/ethhashlookup/index.go new file mode 100644 index 00000000000..2026e145c4e --- /dev/null +++ b/chain/ethhashlookup/index.go @@ -0,0 +1,119 @@ +package ethhashlookup + +import ( + "context" + "os" + + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/ethtypes" + "github.com/filecoin-project/lotus/lib/sqlite" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" +) + +var log = logging.Logger("txhashindex") + +// ChainStore interface; we could use store.ChainStore directly, +// but this simplifies unit testing. +type ChainStore interface { + SubscribeHeadChanges(f store.ReorgNotifee) + GetSecpkMessagesForTipset(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) + GetHeaviestTipSet() *types.TipSet + GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) +} + +var _ ChainStore = (*store.ChainStore)(nil) + +// PopulateAfterSnapshot populates the tx hash index after a snapshot is restored. +func PopulateAfterSnapshot(ctx context.Context, path string, cs ChainStore) error { + // if a database already exists, we try to delete it and create a new one + if _, err := os.Stat(path); err == nil { + if err = os.Remove(path); err != nil { + return xerrors.Errorf("tx hash index already exists at %s and can't be deleted", path) + } + } + + db, _, err := sqlite.Open(path) + if err != nil { + return xerrors.Errorf("failed to setup tx hash index db: %w", err) + } + defer func() { + if err := db.Close(); err != nil { + log.Errorf("error closing tx hash database: %s", err) + } + }() + + if err = sqlite.InitDb(ctx, "message index", db, ddls, []sqlite.MigrationFunc{}); err != nil { + _ = db.Close() + return xerrors.Errorf("error creating tx hash index database: %w", err) + } + + tx, err := db.Begin() + if err != nil { + return xerrors.Errorf("error when starting transaction: %w", err) + } + + rollback := func() { + if err := tx.Rollback(); err != nil { + log.Errorf("error in rollback: %s", err) + } + } + + insertStmt, err := tx.Prepare(insertTxHash) + if err != nil { + rollback() + return xerrors.Errorf("error preparing insertStmt: %w", err) + } + + defer func() { + _ = insertStmt.Close() + }() + + var ( + curTs = cs.GetHeaviestTipSet() + startHeight = curTs.Height() + msgs []*types.SignedMessage + ) + + for curTs != nil { + msgs, err = cs.GetSecpkMessagesForTipset(ctx, curTs) + if err != nil { + log.Infof("stopping import after %d tipsets", startHeight-curTs.Height()) + break + } + + for _, m := range msgs { + ethTx, err := ethtypes.EthTransactionFromSignedFilecoinMessage(m) + if err != nil { + rollback() + return err + } + + txHash, err := ethTx.TxHash() + if err != nil { + rollback() + return err + } + + _, err = insertStmt.Exec(txHash.String(), m.Cid().String()) + if err != nil { + rollback() + return err + } + } + + curTs, err = cs.GetTipSetFromKey(ctx, curTs.Parents()) + if err != nil { + rollback() + return xerrors.Errorf("error walking chain: %w", err) + } + } + + err = tx.Commit() + if err != nil { + return xerrors.Errorf("error committing transaction: %w", err) + } + + return nil +} diff --git a/chain/store/messages.go b/chain/store/messages.go index 60a1eaa608f..bb5d9a67977 100644 --- a/chain/store/messages.go +++ b/chain/store/messages.go @@ -350,3 +350,18 @@ func (cs *ChainStore) LoadSignedMessagesFromCids(ctx context.Context, cids []cid return msgs, nil } + +// GetSecpkMessagesForTipset returns all the secpk messages for a tipset +func (cs *ChainStore) GetSecpkMessagesForTipset(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { + msgs := make([]*types.SignedMessage, 0) + for _, b := range ts.Blocks() { + secpkmsgs, err := cs.SecpkMessagesForBlock(ctx, b) + if err != nil { + return nil, xerrors.Errorf("failed to get secpk messages for block: %w", err) + } + + msgs = append(msgs, secpkmsgs...) + } + + return msgs, nil +} diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index e13557a943a..c204ca87fa8 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -36,6 +36,7 @@ import ( "github.com/filecoin-project/lotus/chain/beacon/drand" "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/consensus/filcns" + "github.com/filecoin-project/lotus/chain/ethhashlookup" "github.com/filecoin-project/lotus/chain/index" proofsffi "github.com/filecoin-project/lotus/chain/proofs/ffi" "github.com/filecoin-project/lotus/chain/stmgr" @@ -650,6 +651,19 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) log.Info("populating message index done") } + if cfg.Index.EnableAutomaticBackFillTxIndex { + log.Info("back-filling tx index...") + basePath, err := lr.SqlitePath() + if err != nil { + return err + } + + if err = ethhashlookup.PopulateAfterSnapshot(ctx, filepath.Join(basePath, ethhashlookup.DefaultDbFilename), cst); err != nil { + return err + } + + log.Info("populating tx index done") + } return nil } diff --git a/extern/filecoin-ffi b/extern/filecoin-ffi index d041c9ab85e..e467d2992e3 160000 --- a/extern/filecoin-ffi +++ b/extern/filecoin-ffi @@ -1 +1 @@ -Subproject commit d041c9ab85e68cfb57daf5fbaedeeee0f72cb5ad +Subproject commit e467d2992e3f9bd09beb71ecf84323b45d2a3511 diff --git a/node/builder_chain.go b/node/builder_chain.go index ffdcf3a64a2..d94d189be94 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -261,7 +261,7 @@ func ConfigFullNode(c interface{}) Option { If(cfg.Fevm.EnableEthRPC, Override(new(*full.EthEventHandler), modules.EthEventHandler(cfg.Events, cfg.Fevm.EnableEthRPC)), - Override(new(full.EthModuleAPI), modules.EthModuleAPI(cfg.Fevm)), + Override(new(full.EthModuleAPI), modules.EthModuleAPI(cfg.Fevm, cfg.Index.EnableAutomaticBackFillTxIndex, cfg.Index.MaxAutomaticBackFillTxIndexHeight)), Override(new(full.EthEventAPI), From(new(*full.EthEventHandler))), ), If(!cfg.Fevm.EnableEthRPC, diff --git a/node/config/def.go b/node/config/def.go index cc390371302..e99ae02557e 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -97,6 +97,10 @@ func DefaultFullNode() *FullNode { MaxFilterResults: 10000, MaxFilterHeightRange: 2880, // conservative limit of one day }, + Index: IndexConfig{ + EnableAutomaticBackFillTxIndex: false, + MaxAutomaticBackFillTxIndexHeight: 5760, // total epochs in 2 days, (30 seconds per epoch) + }, } } diff --git a/node/config/types.go b/node/config/types.go index d7753d4e19e..70260656b7d 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -71,7 +71,7 @@ type ApisConfig struct { } type JournalConfig struct { - //Events of the form: "system1:event1,system1:event2[,...]" + // Events of the form: "system1:event1,system1:event2[,...]" DisabledEvents string } @@ -632,6 +632,12 @@ type IndexConfig struct { // EXPERIMENTAL FEATURE. USE WITH CAUTION // EnableMsgIndex enables indexing of messages on chain. EnableMsgIndex bool + + // EnableAutomaticBackFillTxIndex enables automatic index back-filling + EnableAutomaticBackFillTxIndex bool + + // Maximum number of blocks to process during a single back-fill operation + MaxAutomaticBackFillTxIndexHeight uint64 } type HarmonyDB struct { diff --git a/node/impl/full/txhashmanager.go b/node/impl/full/txhashmanager.go index df31670b60a..f87a1867140 100644 --- a/node/impl/full/txhashmanager.go +++ b/node/impl/full/txhashmanager.go @@ -2,11 +2,11 @@ package full import ( "context" + "errors" "time" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/ethhashlookup" @@ -16,13 +16,63 @@ import ( type EthTxHashManager struct { StateAPI StateAPI + ChainAPI ChainAPI TransactionHashLookup *ethhashlookup.EthTxHashLookup } -func (m *EthTxHashManager) Revert(ctx context.Context, from, to *types.TipSet) error { +func (m *EthTxHashManager) Revert(_ context.Context, _, _ *types.TipSet) error { + return nil +} + +// FillIndexGap populates the Ethereum transaction hash lookup database with missing entries +// by processing blocks until we reach the maximum number of automatic back-fill epochs or the message is already indexed. +func (m *EthTxHashManager) FillIndexGap(ctx context.Context, currHead *types.TipSet, maxAutomaticBackFillBlocks abi.ChainEpoch) error { + log.Info("Start back-filling transaction index from current head: %d", currHead.Height()) + + var ( + fromTs = currHead + processedBlocks = uint64(0) + ) + + for i := abi.ChainEpoch(0); i < maxAutomaticBackFillBlocks && fromTs.Height() > 0; i++ { + for _, block := range fromTs.Blocks() { + msgs, err := m.StateAPI.Chain.SecpkMessagesForBlock(ctx, block) + if err != nil { + log.Debugf("Exiting back-filling at epoch %d: %s", currHead.Height(), err) + return nil + } + + for _, msg := range msgs { + err = m.StoreMsg(ctx, msg) + if err != nil { + if errors.Is(err, ethhashlookup.ErrAlreadyIndexed) { + log.Infof("Reached already indexed transaction at height %d. ", fromTs.Height()) + + log.Info("Stop back-filling tx index, Total processed blocks: %d", processedBlocks) + return nil + } + + return err + } + } + + processedBlocks++ + } + + // Move to the previous tipset + var err error + fromTs, err = m.ChainAPI.ChainGetTipSet(ctx, fromTs.Parents()) + if err != nil { + return err + } + } + + log.Info("Finished back-filling tx index, Total processed blocks: %d", processedBlocks) + return nil } +// PopulateExistingMappings walks back from the current head to the minimum height and populates the eth transaction hash lookup database func (m *EthTxHashManager) PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error { if minHeight < buildconstants.UpgradeHyggeHeight { minHeight = buildconstants.UpgradeHyggeHeight @@ -53,7 +103,7 @@ func (m *EthTxHashManager) PopulateExistingMappings(ctx context.Context, minHeig return nil } -func (m *EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) error { +func (m *EthTxHashManager) Apply(ctx context.Context, _, to *types.TipSet) error { for _, blk := range to.Blocks() { _, smsgs, err := m.StateAPI.Chain.MessagesForBlock(ctx, blk) if err != nil { @@ -80,20 +130,34 @@ func (m *EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) er return nil } -func (m *EthTxHashManager) ProcessSignedMessage(ctx context.Context, msg *types.SignedMessage) { +// StoreMsg is similar to ProcessSignedMessage, but it returns an error if the message is already indexed +// and does not attempt to index the message again. +func (m *EthTxHashManager) StoreMsg(_ context.Context, msg *types.SignedMessage) error { if msg.Signature.Type != crypto.SigTypeDelegated { - return + return nil + } + + txHash, err := m.getEthTxHash(msg) + if err != nil { + return err } + return m.TransactionHashLookup.UpsertUniqueHash(txHash, msg.Cid()) +} + +func (m *EthTxHashManager) getEthTxHash(msg *types.SignedMessage) (ethtypes.EthHash, error) { ethTx, err := ethtypes.EthTransactionFromSignedFilecoinMessage(msg) if err != nil { - log.Errorf("error converting filecoin message to eth tx: %s", err) - return + return ethtypes.EthHash{}, err } - txHash, err := ethTx.TxHash() + return ethTx.TxHash() +} + +func (m *EthTxHashManager) ProcessSignedMessage(_ context.Context, msg *types.SignedMessage) { + txHash, err := m.getEthTxHash(msg) if err != nil { - log.Errorf("error hashing transaction: %s", err) + log.Errorf("error converting filecoin message to eth tx: %s", err) return } @@ -119,7 +183,7 @@ func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager } } -func EthTxHashGC(ctx context.Context, retentionDays int, manager *EthTxHashManager) { +func EthTxHashGC(_ context.Context, retentionDays int, manager *EthTxHashManager) { if retentionDays == 0 { return } diff --git a/node/modules/ethmodule.go b/node/modules/ethmodule.go index ff087036545..34919c29306 100644 --- a/node/modules/ethmodule.go +++ b/node/modules/ethmodule.go @@ -9,10 +9,8 @@ import ( "github.com/hashicorp/golang-lru/arc/v2" "github.com/ipfs/go-cid" "go.uber.org/fx" - "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/chain/ethhashlookup" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/messagepool" @@ -23,9 +21,10 @@ import ( "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" + "golang.org/x/xerrors" ) -func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI, *full.EthEventHandler) (*full.EthModule, error) { +func EthModuleAPI(cfg config.FevmConfig, enableAutomaticBackFill bool, maxAutomaticBackFillBlocks uint64) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI, *full.EthEventHandler) (*full.EthModule, error) { return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI, mpoolapi full.MpoolAPI, syncapi full.SyncAPI, ethEventHandler *full.EthEventHandler) (*full.EthModule, error) { ctx := helpers.LifecycleCtx(mctx, lc) @@ -53,6 +52,7 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep ethTxHashManager := full.EthTxHashManager{ StateAPI: stateapi, + ChainAPI: chainapi, TransactionHashLookup: transactionHashLookup, } @@ -82,7 +82,16 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep } // Tipset listener - _ = ev.Observe(ðTxHashManager) + head := ev.Observe(ðTxHashManager) + + if enableAutomaticBackFill { // If the db exists and back-fill is enabled, we'll back-fill missing entries + go func() { // since there is only one DB connection, so we can do this in a goroutine without worrying about concurrent write issues + err = ethTxHashManager.FillIndexGap(mctx, head, abi.ChainEpoch(maxAutomaticBackFillBlocks)) + if err != nil { + log.Warnf("error when back-filling transaction index gap: %w", err) + } + }() + } ch, err := mp.Updates(ctx) if err != nil {