Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tx_index): fill transaction index gap on enabling automatic backfill #12353

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 76 additions & 13 deletions chain/ethhashlookup/eth_transaction_hash_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import (

const DefaultDbFilename = "txhash.db"

var ErrNotFound = errors.New("not found")
var (
ErrNotFound = errors.New("not found")
ErrAlreadyIndexed = errors.New("already indexed")
)

var ddls = []string{
`CREATE TABLE IF NOT EXISTS eth_tx_hashes (
Expand All @@ -29,19 +32,23 @@ var ddls = []string{
}

const (
insertTxHash = `INSERT INTO eth_tx_hashes (hash, cid) VALUES(?, ?) ON CONFLICT (hash) DO UPDATE SET insertion_time = CURRENT_TIMESTAMP`
getCidFromHash = `SELECT cid FROM eth_tx_hashes WHERE hash = ?`
getHashFromCid = `SELECT hash FROM eth_tx_hashes WHERE cid = ?`
deleteOlderThan = `DELETE FROM eth_tx_hashes WHERE insertion_time < datetime('now', ?);`
insertTxHash = `INSERT INTO eth_tx_hashes (hash, cid) VALUES(?, ?) ON CONFLICT (hash) DO UPDATE SET insertion_time = CURRENT_TIMESTAMP`
insertUniqueTxHash = `INSERT INTO eth_tx_hashes (hash, cid) VALUES(?, ?) ON CONFLICT (hash) DO NOTHING`
getCidFromHash = `SELECT cid FROM eth_tx_hashes WHERE hash = ?`
getHashFromCid = `SELECT hash FROM eth_tx_hashes WHERE cid = ?`
deleteOlderThan = `DELETE FROM eth_tx_hashes WHERE insertion_time < datetime('now', ?);`
getLastTxHash = `SELECT hash, cid FROM eth_tx_hashes ORDER BY insertion_time DESC LIMIT 1`
)

type EthTxHashLookup struct {
db *sql.DB

stmtInsertTxHash *sql.Stmt
stmtGetCidFromHash *sql.Stmt
stmtGetHashFromCid *sql.Stmt
stmtDeleteOlderThan *sql.Stmt
stmtInsertTxHash *sql.Stmt
stmtInsertUniqueTxHash *sql.Stmt
stmtGetCidFromHash *sql.Stmt
stmtGetHashFromCid *sql.Stmt
stmtDeleteOlderThan *sql.Stmt
stmtGetLastTxHash *sql.Stmt
}

func NewTransactionHashLookup(ctx context.Context, path string) (*EthTxHashLookup, error) {
Expand All @@ -68,23 +75,63 @@ func NewTransactionHashLookup(ctx context.Context, path string) (*EthTxHashLooku
func (ei *EthTxHashLookup) initStatements() (err error) {
ei.stmtInsertTxHash, err = ei.db.Prepare(insertTxHash)
if err != nil {
return xerrors.Errorf("prepare stmtInsertTxHash: %w", err)
return xerrors.Errorf("failed to prepare stmtInsertTxHash: %w", err)
}

ei.stmtInsertUniqueTxHash, err = ei.db.Prepare(insertUniqueTxHash)
if err != nil {
return xerrors.Errorf("failed to prepare stmtInsertUniqueTxHash: %w", err)
}

ei.stmtGetCidFromHash, err = ei.db.Prepare(getCidFromHash)
if err != nil {
return xerrors.Errorf("prepare stmtGetCidFromHash: %w", err)
return xerrors.Errorf("failed to prepare stmtGetCidFromHash: %w", err)
}

ei.stmtGetHashFromCid, err = ei.db.Prepare(getHashFromCid)
if err != nil {
return xerrors.Errorf("prepare stmtGetHashFromCid: %w", err)
return xerrors.Errorf("failed to prepare stmtGetHashFromCid: %w", err)
}

ei.stmtDeleteOlderThan, err = ei.db.Prepare(deleteOlderThan)
if err != nil {
return xerrors.Errorf("prepare stmtDeleteOlderThan: %w", err)
return xerrors.Errorf("failed to prepare stmtDeleteOlderThan: %w", err)
}

ei.stmtGetLastTxHash, err = ei.db.Prepare(getLastTxHash)
if err != nil {
return xerrors.Errorf("failed to prepare stmtGetMostRecentTxHash: %w", err)
}

return nil
}

// UpsertUniqueHash inserts a new transaction hash and cid into the database, but only if the transaction hash does not already exist.
func (ei *EthTxHashLookup) UpsertUniqueHash(txHash ethtypes.EthHash, c cid.Cid) error {
if ei.db == nil {
return xerrors.New("db closed")
}

result, err := ei.stmtInsertUniqueTxHash.Exec(txHash.String(), c.String())
if err != nil {
return err
}

affected, err := result.RowsAffected()
if err != nil {
return err
}

if affected == 0 {
// The row already existed, so no insertion occurred
return ErrAlreadyIndexed
}

return err
}

// UpsertHash inserts a new transaction hash and cid into the database,
// and updates the insertion time if the transaction hash already exists.
func (ei *EthTxHashLookup) UpsertHash(txHash ethtypes.EthHash, c cid.Cid) error {
if ei.db == nil {
return xerrors.New("db closed")
Expand Down Expand Up @@ -128,6 +175,22 @@ func (ei *EthTxHashLookup) GetHashFromCid(c cid.Cid) (ethtypes.EthHash, error) {
return ethtypes.ParseEthHash(hashString)
}

// GetLastTransaction returns the most recent transaction hash and cid from the database based on insertion_time
func (ei *EthTxHashLookup) GetLastTransaction() (string, cid.Cid, error) {
if ei.db == nil {
return "", cid.Undef, xerrors.New("db closed")
}

var hashStr, cidStr string
err := ei.stmtGetLastTxHash.QueryRow().Scan(&hashStr, &cidStr)
if err != nil {
return "", cid.Undef, err
}

c, err := cid.Decode(cidStr)
return hashStr, c, err
}

func (ei *EthTxHashLookup) DeleteEntriesOlderThan(days int) (int64, error) {
if ei.db == nil {
return 0, xerrors.New("db closed")
Expand Down
119 changes: 119 additions & 0 deletions chain/ethhashlookup/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package ethhashlookup

import (
"context"
"os"

"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
"github.com/filecoin-project/lotus/lib/sqlite"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
)

var log = logging.Logger("txhashindex")

// ChainStore interface; we could use store.ChainStore directly,
// but this simplifies unit testing.
type ChainStore interface {
SubscribeHeadChanges(f store.ReorgNotifee)
GetSecpkMessagesForTipset(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error)
GetHeaviestTipSet() *types.TipSet
GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
}

var _ ChainStore = (*store.ChainStore)(nil)

// PopulateAfterSnapshot populates the tx hash index after a snapshot is restored.
func PopulateAfterSnapshot(ctx context.Context, path string, cs ChainStore) error {
// if a database already exists, we try to delete it and create a new one
if _, err := os.Stat(path); err == nil {
if err = os.Remove(path); err != nil {
return xerrors.Errorf("tx hash index already exists at %s and can't be deleted", path)
}
}

db, _, err := sqlite.Open(path)
if err != nil {
return xerrors.Errorf("failed to setup tx hash index db: %w", err)
}
defer func() {
if err := db.Close(); err != nil {
log.Errorf("error closing tx hash database: %s", err)
}
}()

if err = sqlite.InitDb(ctx, "message index", db, ddls, []sqlite.MigrationFunc{}); err != nil {
_ = db.Close()
return xerrors.Errorf("error creating tx hash index database: %w", err)
}

tx, err := db.Begin()
if err != nil {
return xerrors.Errorf("error when starting transaction: %w", err)
}

rollback := func() {
if err := tx.Rollback(); err != nil {
log.Errorf("error in rollback: %s", err)
}
}

insertStmt, err := tx.Prepare(insertTxHash)
if err != nil {
rollback()
return xerrors.Errorf("error preparing insertStmt: %w", err)
}

defer func() {
_ = insertStmt.Close()
}()

var (
curTs = cs.GetHeaviestTipSet()
startHeight = curTs.Height()
msgs []*types.SignedMessage
)

for curTs != nil {
msgs, err = cs.GetSecpkMessagesForTipset(ctx, curTs)
if err != nil {
log.Infof("stopping import after %d tipsets", startHeight-curTs.Height())
break
}

for _, m := range msgs {
ethTx, err := ethtypes.EthTransactionFromSignedFilecoinMessage(m)
if err != nil {
rollback()
return err
}

txHash, err := ethTx.TxHash()
if err != nil {
rollback()
return err
}

_, err = insertStmt.Exec(txHash.String(), m.Cid().String())
if err != nil {
rollback()
return err
}
}

curTs, err = cs.GetTipSetFromKey(ctx, curTs.Parents())
if err != nil {
rollback()
return xerrors.Errorf("error walking chain: %w", err)
}
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("error committing transaction: %w", err)
}

return nil
}
15 changes: 15 additions & 0 deletions chain/store/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,18 @@ func (cs *ChainStore) LoadSignedMessagesFromCids(ctx context.Context, cids []cid

return msgs, nil
}

// GetSecpkMessagesForTipset returns all the secpk messages for a tipset
func (cs *ChainStore) GetSecpkMessagesForTipset(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
msgs := make([]*types.SignedMessage, 0)
for _, b := range ts.Blocks() {
secpkmsgs, err := cs.SecpkMessagesForBlock(ctx, b)
if err != nil {
return nil, xerrors.Errorf("failed to get secpk messages for block: %w", err)
}

msgs = append(msgs, secpkmsgs...)
}

return msgs, nil
}
14 changes: 14 additions & 0 deletions cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/filecoin-project/lotus/chain/beacon/drand"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/ethhashlookup"
"github.com/filecoin-project/lotus/chain/index"
proofsffi "github.com/filecoin-project/lotus/chain/proofs/ffi"
"github.com/filecoin-project/lotus/chain/stmgr"
Expand Down Expand Up @@ -650,6 +651,19 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
log.Info("populating message index done")
}

if cfg.Index.EnableAutomaticBackFillTxIndex {
log.Info("back-filling tx index...")
basePath, err := lr.SqlitePath()
if err != nil {
return err
}

if err = ethhashlookup.PopulateAfterSnapshot(ctx, filepath.Join(basePath, ethhashlookup.DefaultDbFilename), cst); err != nil {
return err
}

log.Info("populating tx index done")
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion extern/filecoin-ffi
2 changes: 1 addition & 1 deletion node/builder_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func ConfigFullNode(c interface{}) Option {

If(cfg.Fevm.EnableEthRPC,
Override(new(*full.EthEventHandler), modules.EthEventHandler(cfg.Events, cfg.Fevm.EnableEthRPC)),
Override(new(full.EthModuleAPI), modules.EthModuleAPI(cfg.Fevm)),
Override(new(full.EthModuleAPI), modules.EthModuleAPI(cfg.Fevm, cfg.Index.EnableAutomaticBackFillTxIndex, cfg.Index.MaxAutomaticBackFillTxIndexHeight)),
Override(new(full.EthEventAPI), From(new(*full.EthEventHandler))),
),
If(!cfg.Fevm.EnableEthRPC,
Expand Down
4 changes: 4 additions & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func DefaultFullNode() *FullNode {
MaxFilterResults: 10000,
MaxFilterHeightRange: 2880, // conservative limit of one day
},
Index: IndexConfig{
EnableAutomaticBackFillTxIndex: false,
MaxAutomaticBackFillTxIndexHeight: 5760, // total epochs in 2 days, (30 seconds per epoch)
},
}
}

Expand Down
8 changes: 7 additions & 1 deletion node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type ApisConfig struct {
}

type JournalConfig struct {
//Events of the form: "system1:event1,system1:event2[,...]"
// Events of the form: "system1:event1,system1:event2[,...]"
DisabledEvents string
}

Expand Down Expand Up @@ -632,6 +632,12 @@ type IndexConfig struct {
// EXPERIMENTAL FEATURE. USE WITH CAUTION
// EnableMsgIndex enables indexing of messages on chain.
EnableMsgIndex bool

// EnableAutomaticBackFillTxIndex enables automatic index back-filling
EnableAutomaticBackFillTxIndex bool

// Maximum number of blocks to process during a single back-fill operation
MaxAutomaticBackFillTxIndexHeight uint64
}

type HarmonyDB struct {
Expand Down
Loading
Loading