Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lotus-shed tooling for chain indexer #12474

Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions cmd/lotus-shed/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"math"
"path"
Expand Down Expand Up @@ -51,6 +52,7 @@ var indexesCmd = &cli.Command{
withCategory("txhash", backfillTxHashCmd),
withCategory("events", backfillEventsCmd),
withCategory("events", inspectEventsCmd),
withCategory("chainindex_validation", validateChainIndexCmd),
},
}

Expand Down Expand Up @@ -879,3 +881,177 @@ var backfillTxHashCmd = &cli.Command{
return nil
},
}

type IndexValidationJSON struct {
IndexedTipsetKey types.TipSetKey `json:"indexed_tipset_key"`
IndexedHeight uint64 `json:"indexed_height"`
IndexedMsgCount uint64 `json:"indexed_message_count"`
IndexedEventsCount uint64 `json:"indexed_events_count"`
}

var validateChainIndexCmd = &cli.Command{
Name: "validate-chainindex ",
Usage: "Validates the chainindex for a range of tipset epochs",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "from",
Usage: "The tipset height (epoch) to start backfilling from (0 is head of chain)",
},
&cli.IntFlag{
Name: "to",
Usage: "The tipset height (epoch) to end backfilling at",
Required: true,
},
&cli.BoolFlag{
Name: "backfill",
Usage: "Backfill missing index entries while validating the chain index. When enabled, the command will perform backfilling for any missing indexes (default: true)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"will perform backfilling for any missing epochs in the index"

Value: true,
},
&cli.BoolFlag{
Name: "failfast",
Usage: "Failfast when enabled, the validation process will terminate immediately upon encountering the first error (default: true)",
Value: true,
},
&cli.BoolFlag{
Name: "output",
Usage: "Output the backfilling results in JSON format",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
// Initialize Full Node Services
srv, err := lcli.GetFullNodeServices(cctx)
if err != nil {
return fmt.Errorf("failed to get full node services: %w", err)
}
defer func() {
if closeErr := srv.Close(); closeErr != nil {
log.Errorf("Error closing services: %v", closeErr)
}
}()

api := srv.FullNodeAPI()
ctx := lcli.ReqContext(cctx)

// Determine Starting Epoch
fromEpoch := cctx.Int("from")
if fromEpoch == 0 {
curTs, err := api.ChainHead(ctx)
if err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need an informative error message

}
fromEpoch = int(curTs.Height()) - 1
} else {
fromEpoch = fromEpoch - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only do this is fromEpoch >= head.Height()

Copy link
Contributor Author

@akaladarshi akaladarshi Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done because ChainValidateIndex fetches event of ts from ts+1, So in case of fromEpoch == head.height(), we can't get ts.Height() + 1.

So due to nature of ChainValidateIndex by default fromEpoch will not be included in backfilling, and to maintain consistency I did fromEpoch -1.

}

// Determine Ending Epoch
toEpoch := cctx.Int("to")
if toEpoch > fromEpoch {
return fmt.Errorf("to epoch must be less than from epoch")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errors.New()

}

// Flags
backfill := cctx.Bool("backfill")
failfast := cctx.Bool("failfast")
output := cctx.Bool("output")

// Results Tracking
var results []IndexValidationJSON
var backfilledEpochs []int

log.Infof("Starting %s from epoch: %d to epoch: %d",
func() string {
if backfill {
return "backfill and inspect chainindex"
}
return "inspect chainindex"
}(),
fromEpoch, toEpoch)

// Iterate Over Epochs
for epoch := fromEpoch; epoch >= toEpoch; epoch-- {
if ctx.Err() != nil {
return ctx.Err()
}

indexValidateResp, err := api.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), backfill)
if err != nil {
if failfast {
return fmt.Errorf("failed to validate index for epoch %d: %w", epoch, err)
}
log.Warnf("Error validating index for epoch %d: %v", epoch, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use %w for errors.

continue
}

if !indexValidateResp.TipSetKey.IsEmpty() || indexValidateResp.Height != uint64(epoch) && indexValidateResp.Backfilled == backfill {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we need this validation block here at all. The ChainIndexValidationAPI guruantees that it will return a valid indexValidateResp if error is nil.

errMsg := fmt.Sprintf("epoch %d: invalid index validation response: %+v", epoch, indexValidateResp)
if failfast {
return fmt.Errorf(errMsg)
}
log.Warn(errMsg)
continue
}

if backfill {
backfilledEpochs = append(backfilledEpochs, epoch)
}

if output {
results = append(results, IndexValidationJSON{
IndexedTipsetKey: indexValidateResp.TipSetKey,
IndexedHeight: indexValidateResp.Height,
IndexedMsgCount: indexValidateResp.IndexedMessagesCount,
IndexedEventsCount: indexValidateResp.IndexedEventsCount,
})
} else {
logEpochResult(epoch, indexValidateResp)
}
}

// Output JSON Results
if output {
if err := outputResults(results); err != nil {
return err
}
}

// Log Summary
if backfill {
log.Infof("Backfilled epochs: %v", backfilledEpochs)
} else {
log.Infof("Inspection of chain index from epoch %d to %d completed.", fromEpoch, toEpoch)
}

return nil
},
}

// outputResults marshals the results into JSON and outputs them.
func outputResults(results []IndexValidationJSON) error {
jsonData, err := json.MarshalIndent(results, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal results to JSON: %w", err)
}
fmt.Println(string(jsonData))
return nil
}

// logEpochResult logs the result of backfilling for a single epoch.
func logEpochResult(epoch int, indexValidate *types.IndexValidation) {
if indexValidate.Backfilled {
log.Infof("Epoch %d: Backfilled successfully. TipsetKey: %s, TotalMessages: %d, TotalEvents: %d",
epoch,
indexValidate.TipSetKey,
indexValidate.IndexedMessagesCount,
indexValidate.IndexedMessagesCount,
)
} else {
log.Info("Epoch %d: validated successfully. TipsetKey: %s, TotalMessages: %d, TotalEvents: %d",
epoch,
indexValidate.TipSetKey,
indexValidate.IndexedMessagesCount,
indexValidate.IndexedMessagesCount,
)
}
}
Loading