From 8908e07ce09903f1f9bc479eb20371ebb85c0858 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 17 Sep 2024 16:34:45 +0530 Subject: [PATCH 1/6] feat: add lotus-shed command for backfilling chain indexer --- cmd/lotus-shed/indexes.go | 128 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 07c934d5162..494bbe02b92 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "encoding/json" "fmt" "math" "path" @@ -34,6 +35,8 @@ const ( insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?` + + getEthTxHashCountForTipset = `SELECT COUNT(*) FROM eth_tx_hash WHERE message_cid IN (SELECT message_cid FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)` ) func withCategory(cat string, cmd *cli.Command) *cli.Command { @@ -51,6 +54,7 @@ var indexesCmd = &cli.Command{ withCategory("txhash", backfillTxHashCmd), withCategory("events", backfillEventsCmd), withCategory("events", inspectEventsCmd), + withCategory("chainindex_backfill", backfillChainIndexCmd), }, } @@ -879,3 +883,127 @@ var backfillTxHashCmd = &cli.Command{ return nil }, } + +type IndexValidationJSON struct { + TipsetKey types.TipSetKey `json:"tipset_key"` + Height uint64 `json:"height"` + NonRevertedMessageCount uint64 `json:"non_reverted_message_count"` + NonRevertedEventsCount uint64 `json:"NonRevertedEventsCount"` +} + +var backfillChainIndexCmd = &cli.Command{ + Name: "backfill-chainindex", + Usage: "Backfills the chainindex for a number of epochs starting from a specified height", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "from", + Usage: "the tipset height (epoch) to start backfilling from (0 is head of chain)", + }, + &cli.IntFlag{ + Name: "to", + Usage: "the tipset height (epoch) to end backfilling at", + Required: true, + }, + &cli.BoolFlag{ + Name: "output", + Usage: "output the backfilling results in JSON format", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + srv, err := lcli.GetFullNodeServices(cctx) + if err != nil { + return fmt.Errorf("failed to get full node services: %w", err) + } + + defer func() { + if closeErr := srv.Close(); closeErr != nil { + log.Errorf("error closing services: %v", closeErr) + } + }() + + api := srv.FullNodeAPI() + ctx := lcli.ReqContext(cctx) + + fromEpoch := cctx.Int("from") + if fromEpoch == 0 { + curTs, err := api.ChainHead(ctx) + if err != nil { + return err + } + fromEpoch = int(curTs.Height()) - 1 + } + + toEpoch := cctx.Int("to") + if toEpoch > fromEpoch { + return fmt.Errorf("to epoch must be less than from epoch") + } + + output := cctx.Bool("output") + + results := make([]IndexValidationJSON, 0, fromEpoch-toEpoch+1) + + log.Infof("Starting backfill from epoch: %d to epoch: %d", fromEpoch, toEpoch) + + backfill := true + // start backfilling from the fromEpoch and go back to the toEpoch + for epoch := fromEpoch; epoch >= toEpoch; epoch-- { + select { + case <-ctx.Done(): + log.Warn("Backfilling process was canceled") + return ctx.Err() + default: + } + + indexValidate, err := api.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), backfill) + if err != nil { + return fmt.Errorf("failed to backfill index for epoch %d: %w", epoch, err) + } + + if output { + results = append(results, IndexValidationJSON{ + TipsetKey: indexValidate.TipSetKey, + Height: indexValidate.Height, + NonRevertedMessageCount: indexValidate.IndexedMessagesCount, + NonRevertedEventsCount: indexValidate.IndexedEventsCount, + }) + } else { + logEpochResult(epoch, indexValidate) + } + } + + if output { + if err := outputResults(results); err != nil { + return err + } + } else { + log.Infof("Backfilling chain index from epoch %d to %d completed.", fromEpoch, toEpoch) + } + + return nil + }, +} + +// outputResults marshals the results into JSON and outputs them. +func outputResults(results []IndexValidationJSON) error { + jsonData, err := json.MarshalIndent(results, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal results to JSON: %w", err) + } + fmt.Println(string(jsonData)) + return nil +} + +// logEpochResult logs the result of backfilling for a single epoch. +func logEpochResult(epoch int, indexValidate *types.IndexValidation) { + if indexValidate.Backfilled { + log.Infof("Epoch %d: Backfilled successfully. TipsetKey: %s, TotalMessages: %d, TotalEvents: %d", + epoch, + indexValidate.TipSetKey, + indexValidate.IndexedMessagesCount, + indexValidate.IndexedMessagesCount, + ) + } else { + log.Infof("Epoch %d: Validation issues detected", epoch) + } +} From b37291e3ceb95df8fb9261ff616644a02bf8ad28 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 17 Sep 2024 18:01:04 +0530 Subject: [PATCH 2/6] feat: add lotus-shed command for inspecting the chain indexer --- cmd/lotus-shed/indexes.go | 225 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 225 insertions(+) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 494bbe02b92..8b769e2a89e 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/ipfs/go-cid" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -21,6 +22,7 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/index" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" lcli "github.com/filecoin-project/lotus/cli" @@ -37,6 +39,13 @@ const ( tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?` getEthTxHashCountForTipset = `SELECT COUNT(*) FROM eth_tx_hash WHERE message_cid IN (SELECT message_cid FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)` + getTotalEventEntries = ` + SELECT COUNT(*) + FROM event_entry ee + JOIN event e ON ee.event_id = e.event_id + JOIN tipset_message tm ON e.message_id = tm.message_id + WHERE tm.tipset_key_cid = ? AND tm.message_cid IS NOT NULL + ` ) func withCategory(cat string, cmd *cli.Command) *cli.Command { @@ -55,6 +64,7 @@ var indexesCmd = &cli.Command{ withCategory("events", backfillEventsCmd), withCategory("events", inspectEventsCmd), withCategory("chainindex_backfill", backfillChainIndexCmd), + withCategory("chainindex_inspect", inspectChainIndexCmd), }, } @@ -891,6 +901,221 @@ type IndexValidationJSON struct { NonRevertedEventsCount uint64 `json:"NonRevertedEventsCount"` } +var inspectChainIndexCmd = &cli.Command{ + Name: "inspect-chainindex", + Usage: "Inspects the chainindex for a given tipset", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "from", + Usage: "the tipset height (epoch) to start backfilling from (0 is head of chain)", + }, + &cli.IntFlag{ + Name: "to", + Usage: "the tipset height (epoch) to end backfilling at", + }, + &cli.BoolFlag{ + Name: "log-good", + Usage: "log tipsets that have no detected problems", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + srv, err := lcli.GetFullNodeServices(cctx) + if err != nil { + return fmt.Errorf("failed to get full node services: %w", err) + } + + defer func() { + if closeErr := srv.Close(); closeErr != nil { + log.Errorf("error closing services: %v", closeErr) + } + }() + + api := srv.FullNodeAPI() + ctx := lcli.ReqContext(cctx) + + fromEpoch := cctx.Int("from") + if fromEpoch == 0 { + curTs, err := api.ChainHead(ctx) + if err != nil { + return err + } + fromEpoch = int(curTs.Height()) - 1 + } else { + fromEpoch = fromEpoch - 1 // because the events of the current tipset will be indexed in the next tipset + } + + toEpoch := cctx.Int("to") + if toEpoch > fromEpoch { + return fmt.Errorf("to epoch must be less than from epoch") + } + + logGood := cctx.Bool("log-good") + + basePath, err := homedir.Expand(cctx.String("repo")) + if err != nil { + return err + } + + dbPath := filepath.Join(basePath, "sqlite", index.DefaultDbFilename) + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return err + } + + stmtGetTxHashCount, err := db.Prepare(getEthTxHashCountForTipset) + if err != nil { + return err + } + + stmtGetTotalEventEntries, err := db.Prepare(getTotalEventEntries) + if err != nil { + return err + } + + defer func() { + stmtGetTxHashCount.Close() + stmtGetTotalEventEntries.Close() + + err := db.Close() + if err != nil { + log.Errorf("Error closing db: %v", err) + } + }() + + inspectData := func(ctx context.Context, currTs *types.TipSet, msgs []lapi.Message, receipts []*types.MessageReceipt, indexValidateResp *types.IndexValidation) error { + tsKeyCid, err := currTs.Key().Cid() + if err != nil { + return fmt.Errorf("failed to get tipset key cid: %w", err) + } + + var ( + problems []string + epoch = currTs.Height() + ) + + if indexValidateResp.IndexedMessagesCount != uint64(len(msgs)) { + problems = append(problems, fmt.Sprintf("epoch %d: total messages mismatch: indexed count %d, chainstore count %d", epoch, indexValidateResp.IndexedMessagesCount, len(msgs))) + } + + var ( + expectEvents int + expectEntries int + ) + for _, receipt := range receipts { + if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil { + continue + } + events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) + if err != nil { + return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err) + } + expectEvents += len(events) + for _, event := range events { + expectEntries += len(event.Entries) + } + } + + if indexValidateResp.IndexedEventsCount != uint64(expectEvents) { + problems = append(problems, fmt.Sprintf("epoch %d: total events mismatch: indexed count %d, chainstore count %d", epoch, indexValidateResp.IndexedEventsCount, expectEvents)) + } + + var actualEventEntries int + if err = stmtGetTotalEventEntries.QueryRowContext(ctx, currTs.Key().String()).Scan(&actualEventEntries); err != nil { + return fmt.Errorf("failed to get total event entries for tipset %s: %w", currTs, err) + } + + if expectEntries != actualEventEntries { + problems = append(problems, fmt.Sprintf("epoch %d: total event entries mismatch: indexed count %d, chainstore count %d", epoch, actualEventEntries, expectEntries)) + } + + var expectedTxHashes int + for _, blockHeader := range currTs.Blocks() { + blkMsgs, err := api.ChainGetBlockMessages(ctx, blockHeader.Cid()) + if err != nil { + return fmt.Errorf("failed to get block messages for block %s: %w", blockHeader.Cid(), err) + } + + for _, smsg := range blkMsgs.SecpkMessages { + if smsg.Signature.Type != crypto.SigTypeDelegated { + continue + } + + tx, err := ethtypes.EthTransactionFromSignedFilecoinMessage(smsg) + if err != nil { + return fmt.Errorf("failed to convert from signed message: %w at epoch: %d", err, epoch) + } + + if _, err = tx.TxHash(); err != nil { + return fmt.Errorf("failed to calculate hash for ethTx: %w at epoch: %d", err, epoch) + } + + expectedTxHashes++ + } + } + + var actualTxHashes int + if err = stmtGetTxHashCount.QueryRowContext(ctx, currTs.Key().String()).Scan(&actualTxHashes); err != nil { + return fmt.Errorf("failed to get tx hash count for tipset %s: %w", currTs, err) + } + + if expectedTxHashes != actualTxHashes { + problems = append(problems, fmt.Sprintf("epoch %d: total tx hashes mismatch: indexed count %d, chainstore count %d", epoch, actualTxHashes, expectedTxHashes)) + } + + if len(problems) > 0 { + log.Warnf("✗ Epoch %d (%s): %v", epoch, tsKeyCid, problems) + } else if logGood { + log.Infof("✓ Epoch %d (%s)", epoch, tsKeyCid) + } + + return nil + } + + var blockCID cid.Cid + for epoch := fromEpoch; ctx.Err() == nil && epoch >= toEpoch; epoch-- { + indexValidateResp, err := api.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false) + if err != nil { + return fmt.Errorf("failed to validate index for epoch %d: %w", epoch, err) + } + + if !indexValidateResp.TipSetKey.IsEmpty() || indexValidateResp.Height != uint64(epoch) { + return fmt.Errorf("epoch %d: invalid index validation: %+v", epoch, indexValidateResp) + } + + prevTs, err := api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(epoch), types.EmptyTSK) + if err != nil { + return fmt.Errorf("failed to get tipset at epoch %d: %w", epoch, err) + } + + blockCID = prevTs.Blocks()[0].Cid() + // get receipts for the parent of the prevTs (which will be currTs) + msgs, err := api.ChainGetParentMessages(ctx, blockCID) + if err != nil { + return fmt.Errorf("failed to get parent messages at epoch %d: %w", epoch, err) + } + + // get receipts for the parent of prevTs (which will be currTs) + receipts, err := api.ChainGetParentReceipts(ctx, blockCID) + if err != nil { + return fmt.Errorf("failed to get parent receipts at epoch %d: %w", epoch, err) + } + + // parent of the prevTs is the currTs, because we are walking backwards and started fromEpoch - 1 + currTs, err := api.ChainGetTipSet(ctx, prevTs.Parents()) + if err != nil { + return fmt.Errorf("failed to get parent tipset at epoch %d: %w", epoch, err) + } + + err = inspectData(ctx, currTs, msgs, receipts, indexValidateResp) + if err != nil { + return err + } + } + return nil + }, +} + var backfillChainIndexCmd = &cli.Command{ Name: "backfill-chainindex", Usage: "Backfills the chainindex for a number of epochs starting from a specified height", From 0dc4dba52aff3f808e084b5006c5cb9200ea303c Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 17 Sep 2024 23:38:23 +0530 Subject: [PATCH 3/6] feat: use single lotus-shed command to inspect and backfill --- cmd/lotus-shed/indexes.go | 326 +++++++++----------------------------- 1 file changed, 79 insertions(+), 247 deletions(-) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 8b769e2a89e..19299ab9c4d 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -11,7 +11,6 @@ import ( "strings" "time" - "github.com/ipfs/go-cid" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -22,7 +21,6 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" lapi "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/index" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" lcli "github.com/filecoin-project/lotus/cli" @@ -63,8 +61,7 @@ var indexesCmd = &cli.Command{ withCategory("txhash", backfillTxHashCmd), withCategory("events", backfillEventsCmd), withCategory("events", inspectEventsCmd), - withCategory("chainindex_backfill", backfillChainIndexCmd), - withCategory("chainindex_inspect", inspectChainIndexCmd), + withCategory("chainindex_validation", validateChainIndexCmd), }, } @@ -895,261 +892,57 @@ var backfillTxHashCmd = &cli.Command{ } type IndexValidationJSON struct { - TipsetKey types.TipSetKey `json:"tipset_key"` - Height uint64 `json:"height"` - NonRevertedMessageCount uint64 `json:"non_reverted_message_count"` - NonRevertedEventsCount uint64 `json:"NonRevertedEventsCount"` + IndexedTipsetKey types.TipSetKey `json:"indexed_tipset_key"` + IndexedHeight uint64 `json:"indexed_height"` + IndexedMsgCount uint64 `json:"indexed_message_count"` + IndexedEventsCount uint64 `json:"indexed_events_count"` } -var inspectChainIndexCmd = &cli.Command{ - Name: "inspect-chainindex", - Usage: "Inspects the chainindex for a given tipset", +var validateChainIndexCmd = &cli.Command{ + Name: "validate-chainindex ", + Usage: "Validates the chainindex for a range of tipset epochs", Flags: []cli.Flag{ &cli.IntFlag{ Name: "from", - Usage: "the tipset height (epoch) to start backfilling from (0 is head of chain)", + Usage: "The tipset height (epoch) to start backfilling from (0 is head of chain)", }, &cli.IntFlag{ - Name: "to", - Usage: "the tipset height (epoch) to end backfilling at", + Name: "to", + Usage: "The tipset height (epoch) to end backfilling at", + Required: true, }, &cli.BoolFlag{ - Name: "log-good", - Usage: "log tipsets that have no detected problems", - Value: false, + Name: "backfill", + Usage: "Backfill missing index entries while validating the chain index. When enabled, the command will perform backfilling for any missing indexes (default: true)", + Value: true, }, - }, - Action: func(cctx *cli.Context) error { - srv, err := lcli.GetFullNodeServices(cctx) - if err != nil { - return fmt.Errorf("failed to get full node services: %w", err) - } - - defer func() { - if closeErr := srv.Close(); closeErr != nil { - log.Errorf("error closing services: %v", closeErr) - } - }() - - api := srv.FullNodeAPI() - ctx := lcli.ReqContext(cctx) - - fromEpoch := cctx.Int("from") - if fromEpoch == 0 { - curTs, err := api.ChainHead(ctx) - if err != nil { - return err - } - fromEpoch = int(curTs.Height()) - 1 - } else { - fromEpoch = fromEpoch - 1 // because the events of the current tipset will be indexed in the next tipset - } - - toEpoch := cctx.Int("to") - if toEpoch > fromEpoch { - return fmt.Errorf("to epoch must be less than from epoch") - } - - logGood := cctx.Bool("log-good") - - basePath, err := homedir.Expand(cctx.String("repo")) - if err != nil { - return err - } - - dbPath := filepath.Join(basePath, "sqlite", index.DefaultDbFilename) - db, err := sql.Open("sqlite3", dbPath) - if err != nil { - return err - } - - stmtGetTxHashCount, err := db.Prepare(getEthTxHashCountForTipset) - if err != nil { - return err - } - - stmtGetTotalEventEntries, err := db.Prepare(getTotalEventEntries) - if err != nil { - return err - } - - defer func() { - stmtGetTxHashCount.Close() - stmtGetTotalEventEntries.Close() - - err := db.Close() - if err != nil { - log.Errorf("Error closing db: %v", err) - } - }() - - inspectData := func(ctx context.Context, currTs *types.TipSet, msgs []lapi.Message, receipts []*types.MessageReceipt, indexValidateResp *types.IndexValidation) error { - tsKeyCid, err := currTs.Key().Cid() - if err != nil { - return fmt.Errorf("failed to get tipset key cid: %w", err) - } - - var ( - problems []string - epoch = currTs.Height() - ) - - if indexValidateResp.IndexedMessagesCount != uint64(len(msgs)) { - problems = append(problems, fmt.Sprintf("epoch %d: total messages mismatch: indexed count %d, chainstore count %d", epoch, indexValidateResp.IndexedMessagesCount, len(msgs))) - } - - var ( - expectEvents int - expectEntries int - ) - for _, receipt := range receipts { - if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil { - continue - } - events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) - if err != nil { - return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err) - } - expectEvents += len(events) - for _, event := range events { - expectEntries += len(event.Entries) - } - } - - if indexValidateResp.IndexedEventsCount != uint64(expectEvents) { - problems = append(problems, fmt.Sprintf("epoch %d: total events mismatch: indexed count %d, chainstore count %d", epoch, indexValidateResp.IndexedEventsCount, expectEvents)) - } - - var actualEventEntries int - if err = stmtGetTotalEventEntries.QueryRowContext(ctx, currTs.Key().String()).Scan(&actualEventEntries); err != nil { - return fmt.Errorf("failed to get total event entries for tipset %s: %w", currTs, err) - } - - if expectEntries != actualEventEntries { - problems = append(problems, fmt.Sprintf("epoch %d: total event entries mismatch: indexed count %d, chainstore count %d", epoch, actualEventEntries, expectEntries)) - } - - var expectedTxHashes int - for _, blockHeader := range currTs.Blocks() { - blkMsgs, err := api.ChainGetBlockMessages(ctx, blockHeader.Cid()) - if err != nil { - return fmt.Errorf("failed to get block messages for block %s: %w", blockHeader.Cid(), err) - } - - for _, smsg := range blkMsgs.SecpkMessages { - if smsg.Signature.Type != crypto.SigTypeDelegated { - continue - } - - tx, err := ethtypes.EthTransactionFromSignedFilecoinMessage(smsg) - if err != nil { - return fmt.Errorf("failed to convert from signed message: %w at epoch: %d", err, epoch) - } - - if _, err = tx.TxHash(); err != nil { - return fmt.Errorf("failed to calculate hash for ethTx: %w at epoch: %d", err, epoch) - } - - expectedTxHashes++ - } - } - - var actualTxHashes int - if err = stmtGetTxHashCount.QueryRowContext(ctx, currTs.Key().String()).Scan(&actualTxHashes); err != nil { - return fmt.Errorf("failed to get tx hash count for tipset %s: %w", currTs, err) - } - - if expectedTxHashes != actualTxHashes { - problems = append(problems, fmt.Sprintf("epoch %d: total tx hashes mismatch: indexed count %d, chainstore count %d", epoch, actualTxHashes, expectedTxHashes)) - } - - if len(problems) > 0 { - log.Warnf("✗ Epoch %d (%s): %v", epoch, tsKeyCid, problems) - } else if logGood { - log.Infof("✓ Epoch %d (%s)", epoch, tsKeyCid) - } - - return nil - } - - var blockCID cid.Cid - for epoch := fromEpoch; ctx.Err() == nil && epoch >= toEpoch; epoch-- { - indexValidateResp, err := api.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false) - if err != nil { - return fmt.Errorf("failed to validate index for epoch %d: %w", epoch, err) - } - - if !indexValidateResp.TipSetKey.IsEmpty() || indexValidateResp.Height != uint64(epoch) { - return fmt.Errorf("epoch %d: invalid index validation: %+v", epoch, indexValidateResp) - } - - prevTs, err := api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(epoch), types.EmptyTSK) - if err != nil { - return fmt.Errorf("failed to get tipset at epoch %d: %w", epoch, err) - } - - blockCID = prevTs.Blocks()[0].Cid() - // get receipts for the parent of the prevTs (which will be currTs) - msgs, err := api.ChainGetParentMessages(ctx, blockCID) - if err != nil { - return fmt.Errorf("failed to get parent messages at epoch %d: %w", epoch, err) - } - - // get receipts for the parent of prevTs (which will be currTs) - receipts, err := api.ChainGetParentReceipts(ctx, blockCID) - if err != nil { - return fmt.Errorf("failed to get parent receipts at epoch %d: %w", epoch, err) - } - - // parent of the prevTs is the currTs, because we are walking backwards and started fromEpoch - 1 - currTs, err := api.ChainGetTipSet(ctx, prevTs.Parents()) - if err != nil { - return fmt.Errorf("failed to get parent tipset at epoch %d: %w", epoch, err) - } - - err = inspectData(ctx, currTs, msgs, receipts, indexValidateResp) - if err != nil { - return err - } - } - return nil - }, -} - -var backfillChainIndexCmd = &cli.Command{ - Name: "backfill-chainindex", - Usage: "Backfills the chainindex for a number of epochs starting from a specified height", - Flags: []cli.Flag{ - &cli.IntFlag{ - Name: "from", - Usage: "the tipset height (epoch) to start backfilling from (0 is head of chain)", - }, - &cli.IntFlag{ - Name: "to", - Usage: "the tipset height (epoch) to end backfilling at", - Required: true, + &cli.BoolFlag{ + Name: "failfast", + Usage: "Failfast when enabled, the validation process will terminate immediately upon encountering the first error (default: true)", + Value: true, }, &cli.BoolFlag{ Name: "output", - Usage: "output the backfilling results in JSON format", + Usage: "Output the backfilling results in JSON format", Value: false, }, }, Action: func(cctx *cli.Context) error { + // Initialize Full Node Services srv, err := lcli.GetFullNodeServices(cctx) if err != nil { return fmt.Errorf("failed to get full node services: %w", err) } - defer func() { if closeErr := srv.Close(); closeErr != nil { - log.Errorf("error closing services: %v", closeErr) + log.Errorf("Error closing services: %v", closeErr) } }() api := srv.FullNodeAPI() ctx := lcli.ReqContext(cctx) + // Determine Starting Epoch fromEpoch := cctx.Int("from") if fromEpoch == 0 { curTs, err := api.ChainHead(ctx) @@ -1157,52 +950,86 @@ var backfillChainIndexCmd = &cli.Command{ return err } fromEpoch = int(curTs.Height()) - 1 + } else { + fromEpoch = fromEpoch - 1 } + // Determine Ending Epoch toEpoch := cctx.Int("to") if toEpoch > fromEpoch { return fmt.Errorf("to epoch must be less than from epoch") } + // Flags + backfill := cctx.Bool("backfill") + failfast := cctx.Bool("failfast") output := cctx.Bool("output") - results := make([]IndexValidationJSON, 0, fromEpoch-toEpoch+1) + // Results Tracking + var results []IndexValidationJSON + var backfilledEpochs []int - log.Infof("Starting backfill from epoch: %d to epoch: %d", fromEpoch, toEpoch) + log.Infof("Starting %s from epoch: %d to epoch: %d", + func() string { + if backfill { + return "backfill and inspect chainindex" + } + return "inspect chainindex" + }(), + fromEpoch, toEpoch) - backfill := true - // start backfilling from the fromEpoch and go back to the toEpoch + // Iterate Over Epochs for epoch := fromEpoch; epoch >= toEpoch; epoch-- { - select { - case <-ctx.Done(): - log.Warn("Backfilling process was canceled") + if ctx.Err() != nil { return ctx.Err() - default: } - indexValidate, err := api.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), backfill) + indexValidateResp, err := api.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), backfill) if err != nil { - return fmt.Errorf("failed to backfill index for epoch %d: %w", epoch, err) + if failfast { + return fmt.Errorf("failed to validate index for epoch %d: %w", epoch, err) + } + log.Warnf("Error validating index for epoch %d: %v", epoch, err) + continue + } + + if !indexValidateResp.TipSetKey.IsEmpty() || indexValidateResp.Height != uint64(epoch) && indexValidateResp.Backfilled == backfill { + errMsg := fmt.Sprintf("epoch %d: invalid index validation response: %+v", epoch, indexValidateResp) + if failfast { + return fmt.Errorf(errMsg) + } + log.Warn(errMsg) + continue + } + + if backfill { + backfilledEpochs = append(backfilledEpochs, epoch) } if output { results = append(results, IndexValidationJSON{ - TipsetKey: indexValidate.TipSetKey, - Height: indexValidate.Height, - NonRevertedMessageCount: indexValidate.IndexedMessagesCount, - NonRevertedEventsCount: indexValidate.IndexedEventsCount, + IndexedTipsetKey: indexValidateResp.TipSetKey, + IndexedHeight: indexValidateResp.Height, + IndexedMsgCount: indexValidateResp.IndexedMessagesCount, + IndexedEventsCount: indexValidateResp.IndexedEventsCount, }) } else { - logEpochResult(epoch, indexValidate) + logEpochResult(epoch, indexValidateResp) } } + // Output JSON Results if output { if err := outputResults(results); err != nil { return err } + } + + // Log Summary + if backfill { + log.Infof("Backfilled epochs: %v", backfilledEpochs) } else { - log.Infof("Backfilling chain index from epoch %d to %d completed.", fromEpoch, toEpoch) + log.Infof("Inspection of chain index from epoch %d to %d completed.", fromEpoch, toEpoch) } return nil @@ -1229,6 +1056,11 @@ func logEpochResult(epoch int, indexValidate *types.IndexValidation) { indexValidate.IndexedMessagesCount, ) } else { - log.Infof("Epoch %d: Validation issues detected", epoch) + log.Info("Epoch %d: validated successfully. TipsetKey: %s, TotalMessages: %d, TotalEvents: %d", + epoch, + indexValidate.TipSetKey, + indexValidate.IndexedMessagesCount, + indexValidate.IndexedMessagesCount, + ) } } From 2777272f9fc8d95069b0786fb9a0b3489e7ae7da Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 17 Sep 2024 23:40:14 +0530 Subject: [PATCH 4/6] fix: remove the unused queries --- cmd/lotus-shed/indexes.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 19299ab9c4d..d74e75d9afb 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -35,15 +35,6 @@ const ( insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?` - - getEthTxHashCountForTipset = `SELECT COUNT(*) FROM eth_tx_hash WHERE message_cid IN (SELECT message_cid FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)` - getTotalEventEntries = ` - SELECT COUNT(*) - FROM event_entry ee - JOIN event e ON ee.event_id = e.event_id - JOIN tipset_message tm ON e.message_id = tm.message_id - WHERE tm.tipset_key_cid = ? AND tm.message_cid IS NOT NULL - ` ) func withCategory(cat string, cmd *cli.Command) *cli.Command { From fa30f2343034345e87045247992a103dba03571e Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Wed, 18 Sep 2024 10:05:29 +0530 Subject: [PATCH 5/6] small changes --- cmd/lotus-shed/indexes.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index d74e75d9afb..2abe186542f 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -919,7 +919,6 @@ var validateChainIndexCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - // Initialize Full Node Services srv, err := lcli.GetFullNodeServices(cctx) if err != nil { return fmt.Errorf("failed to get full node services: %w", err) @@ -933,7 +932,6 @@ var validateChainIndexCmd = &cli.Command{ api := srv.FullNodeAPI() ctx := lcli.ReqContext(cctx) - // Determine Starting Epoch fromEpoch := cctx.Int("from") if fromEpoch == 0 { curTs, err := api.ChainHead(ctx) @@ -945,13 +943,15 @@ var validateChainIndexCmd = &cli.Command{ fromEpoch = fromEpoch - 1 } - // Determine Ending Epoch + if fromEpoch <= 0 { + return fmt.Errorf("invalid from epoch: %d", fromEpoch) + } + toEpoch := cctx.Int("to") if toEpoch > fromEpoch { return fmt.Errorf("to epoch must be less than from epoch") } - // Flags backfill := cctx.Bool("backfill") failfast := cctx.Bool("failfast") output := cctx.Bool("output") @@ -969,7 +969,8 @@ var validateChainIndexCmd = &cli.Command{ }(), fromEpoch, toEpoch) - // Iterate Over Epochs + // starting from the FromEpoch-1 and going down to the ToEpoch + // this is because `ChainValidateIndex` fetches the tipset.height()+1, which might not be available in case of chain head for epoch := fromEpoch; epoch >= toEpoch; epoch-- { if ctx.Err() != nil { return ctx.Err() From cb141d7f26eba5cc4c3188c764bb61b03c53323d Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Wed, 18 Sep 2024 11:13:37 +0530 Subject: [PATCH 6/6] add change log --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 669129d5af5..f3197464ea5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ # Lotus changelog # UNRELEASED - + - https://github.com/filecoin-project/lotus/pull/12474: feat: lotus-shed tooling for chain indexer + ## ☢️ Upgrade Warnings ☢️ ## New features