Skip to content

Commit

Permalink
Add new lotus-shed command for backfillling actor events
Browse files Browse the repository at this point in the history
  • Loading branch information
fridrik01 committed Jul 20, 2023
1 parent 2ddb52e commit ed3a508
Showing 1 changed file with 283 additions and 0 deletions.
283 changes: 283 additions & 0 deletions cmd/lotus-shed/indexes.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package main

import (
"context"
"database/sql"
"fmt"
"path"
"path/filepath"
"strings"

"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-varint"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
builtintypes "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
lcli "github.com/filecoin-project/lotus/cli"
)
Expand All @@ -31,6 +37,283 @@ var indexesCmd = &cli.Command{
withCategory("msgindex", backfillMsgIndexCmd),
withCategory("msgindex", pruneMsgIndexCmd),
withCategory("txhash", backfillTxHashCmd),
withCategory("events", backfillEventsCmd),
},
}

var backfillEventsCmd = &cli.Command{
Name: "backfill-events",
Usage: "Backfill the events.db for a number of epochs starting from a specified height",
Flags: []cli.Flag{
&cli.UintFlag{
Name: "from",
Value: 0,
Usage: "the tipset height to start backfilling from (0 is head of chain)",
},
&cli.IntFlag{
Name: "epochs",
Value: 2000,
Usage: "the number of epochs to backfill",
},
},
Action: func(cctx *cli.Context) error {
srv, err := lcli.GetFullNodeServices(cctx)
if err != nil {
return err
}
defer srv.Close() //nolint:errcheck

api := srv.FullNodeAPI()
ctx := lcli.ReqContext(cctx)

// currTs will be the tipset where we start backfilling from
currTs, err := api.ChainHead(ctx)
if err != nil {
return err
}
if cctx.IsSet("from") {
// we need to fetch the tipset after the epoch being specified since we will need to advance currTs
currTs, err = api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(cctx.Int("from")+1), currTs.Key())
if err != nil {
return err
}
}

// advance currTs by one epoch and maintain prevTs as the previous tipset (this allows us to easily use the ChainGetParentMessages/Receipt API)
prevTs := currTs
currTs, err = api.ChainGetTipSet(ctx, currTs.Parents())
if err != nil {
return fmt.Errorf("failed to load tipset %s: %w", prevTs.Parents(), err)
}

epochs := cctx.Int("epochs")

basePath, err := homedir.Expand(cctx.String("repo"))
if err != nil {
return err
}

dbPath := path.Join(basePath, "sqlite", "events.db")
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return err
}

defer func() {
err := db.Close()
if err != nil {
fmt.Printf("ERROR: closing db: %s", err)
}
}()

stmtSelectEvent, err := db.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false")
if err != nil {
return err
}
stmtSelectEntry, err := db.Prepare("SELECT EXISTS(SELECT 1 from event_entry WHERE event_id=? and indexed=? and flags=? and key=? and codec=? and value=?)")
if err != nil {
return err
}
stmtEvent, err := db.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
stmtEntry, err := db.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}

addressLookups := make(map[abi.ActorID]address.Address)

resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
// we only want to match using f4 addresses
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
}

actor, err := api.StateGetActor(ctx, idAddr, ts.Key())
if err != nil || actor.Address == nil {
return address.Undef, false
}

// if robust address is not f4 then we won't match against it so bail early
if actor.Address.Protocol() != address.Delegated {
return address.Undef, false
}

// we have an f4 address, make sure it's assigned by the EAM
if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID {
return address.Undef, false
}
return *actor.Address, true
}

isIndexedValue := func(b uint8) bool {
// currently we mark the full entry as indexed if either the key
// or the value are indexed; in the future we will need finer-grained
// management of indices
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}

var eventsAffected int64
var entriesAffected int64
for i := 0; i < epochs; i++ {
select {
case <-ctx.Done():
return nil
default:
}

log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", i, currTs.Height(), eventsAffected, entriesAffected)

blockCid := prevTs.Blocks()[0].Cid()

// get messages for the parent of the previous tipset (which will be currTs)
msgs, err := api.ChainGetParentMessages(ctx, blockCid)
if err != nil {
return fmt.Errorf("failed to get parent messages for block %s: %w", blockCid, err)
}

// get receipts for the parent of the previous tipset (which will be currTs)
receipts, err := api.ChainGetParentReceipts(ctx, blockCid)
if err != nil {
return fmt.Errorf("failed to get parent receipts for block %s: %w", blockCid, err)
}

if len(msgs) != len(receipts) {
return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(msgs), len(receipts))
}

// loop over each message receipt and backfill the events
for idx, receipt := range receipts {
msg := msgs[idx]

if receipt.ExitCode != exitcode.Ok {
continue
}

if receipt.EventsRoot == nil {
continue
}

events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot)
if err != nil {
return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err)
}

for eventIdx, event := range events {
addr, found := addressLookups[event.Emitter]
if !found {
var ok bool
addr, ok = resolveFn(ctx, event.Emitter, currTs)
if !ok {
// not an address we will be able to match against
continue
}
addressLookups[event.Emitter] = addr
}

tsKeyCid, err := currTs.Key().Cid()
if err != nil {
return fmt.Errorf("failed to get tipset key cid: %w", err)
}

// select the highest event id that exists in database, or null if none exists
var entryID sql.NullInt64
err = stmtSelectEvent.QueryRow(
currTs.Height(),
currTs.Key().Bytes(),
tsKeyCid.Bytes(),
addr.Bytes(),
eventIdx,
msg.Cid.Bytes(),
idx,
).Scan(&entryID)
if err != nil {
return fmt.Errorf("error checking if event exists: %w", err)
}

if !entryID.Valid {
// event does not exist, lets backfill it
res, err := stmtEvent.Exec(
currTs.Height(), // height
currTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
eventIdx, // event_index
msg.Cid.Bytes(), // message_cid
idx, // message_index
false, // reverted
)

if err != nil {
return fmt.Errorf("error inserting event: %w", err)
}

entryID.Int64, err = res.LastInsertId()
if err != nil {
return fmt.Errorf("could not get last insert id: %w", err)
}

rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("error getting rows affected: %s", err)
}

eventsAffected += rowsAffected
}

for _, entry := range event.Entries {
// check if entry exists
var exists bool
err = stmtSelectEntry.QueryRow(
entryID.Int64,
isIndexedValue(entry.Flags),
[]byte{entry.Flags},
entry.Key,
entry.Codec,
entry.Value,
).Scan(&exists)
if err != nil {
return fmt.Errorf("error checking if entry exists: %w", err)
}

if !exists {
// entry does not exist, lets backfill it
res, err := stmtEntry.Exec(
entryID.Int64, // event_id
isIndexedValue(entry.Flags), // indexed
[]byte{entry.Flags}, // flags
entry.Key, // key
entry.Codec, // codec
entry.Value, // value
)
if err != nil {
return fmt.Errorf("error inserting entry: %w", err)
}

rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("error getting rows affected: %s", err)
}
entriesAffected += rowsAffected
}
}
}
}

// advance prevTs and currTs up the chain
prevTs = currTs
currTs, err = api.ChainGetTipSet(ctx, currTs.Parents())
if err != nil {
return fmt.Errorf("failed to load tipset %s: %w", currTs, err)
}
}

log.Infof("backfilling events complete, eventsAffected:%d, entriesAffected:%d", eventsAffected, entriesAffected)

return nil
},
}

Expand Down

0 comments on commit ed3a508

Please sign in to comment.