Skip to content

Commit

Permalink
Address PR feedback and move each epoch inside own tx
Browse files Browse the repository at this point in the history
  • Loading branch information
fridrik01 committed Jul 24, 2023
1 parent 8a5950b commit dafa837
Showing 1 changed file with 103 additions and 95 deletions.
198 changes: 103 additions & 95 deletions cmd/lotus-shed/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
builtintypes "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"

lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
lcli "github.com/filecoin-project/lotus/cli"
Expand Down Expand Up @@ -106,23 +106,6 @@ var backfillEventsCmd = &cli.Command{
}
}()

stmtSelectEvent, err := db.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false")
if err != nil {
return err
}
stmtSelectEntry, err := db.Prepare("SELECT EXISTS(SELECT 1 from event_entry WHERE event_id=? and indexed=? and flags=? and key=? and codec=? and value=?)")
if err != nil {
return err
}
stmtEvent, err := db.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
stmtEntry, err := db.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}

addressLookups := make(map[abi.ActorID]address.Address)

resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
Expand Down Expand Up @@ -156,35 +139,32 @@ var backfillEventsCmd = &cli.Command{
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}

var eventsAffected int64
var entriesAffected int64
for i := 0; i < epochs; i++ {
select {
case <-ctx.Done():
return nil
default:
}

log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", i, currTs.Height(), eventsAffected, entriesAffected)
var totalEventsAffected int64
var totalEntriesAffected int64

blockCid := prevTs.Blocks()[0].Cid()

// get messages for the parent of the previous tipset (which will be currTs)
msgs, err := api.ChainGetParentMessages(ctx, blockCid)
processHeight := func(ctx context.Context, cnt int, msgs []lapi.Message, receipts []*types.MessageReceipt) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to get parent messages for block %s: %w", blockCid, err)
return fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.Rollback() //nolint:errcheck

// get receipts for the parent of the previous tipset (which will be currTs)
receipts, err := api.ChainGetParentReceipts(ctx, blockCid)
stmtSelectEvent, err := tx.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false")
if err != nil {
return fmt.Errorf("failed to get parent receipts for block %s: %w", blockCid, err)
return err
}

if len(msgs) != len(receipts) {
return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(msgs), len(receipts))
stmtEvent, err := tx.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
stmtEntry, err := tx.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}

var eventsAffected int64
var entriesAffected int64

// loop over each message receipt and backfill the events
for idx, receipt := range receipts {
msg := msgs[idx]
Expand Down Expand Up @@ -234,75 +214,103 @@ var backfillEventsCmd = &cli.Command{
return fmt.Errorf("error checking if event exists: %w", err)
}

if !entryID.Valid {
// event does not exist, lets backfill it
res, err := stmtEvent.Exec(
currTs.Height(), // height
currTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
eventIdx, // event_index
msg.Cid.Bytes(), // message_cid
idx, // message_index
false, // reverted
)

if err != nil {
return fmt.Errorf("error inserting event: %w", err)
}
// we already have this event
if entryID.Valid {
continue
}

entryID.Int64, err = res.LastInsertId()
if err != nil {
return fmt.Errorf("could not get last insert id: %w", err)
}
// event does not exist, lets backfill it
res, err := tx.Stmt(stmtEvent).Exec(
currTs.Height(), // height
currTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
eventIdx, // event_index
msg.Cid.Bytes(), // message_cid
idx, // message_index
false, // reverted
)
if err != nil {
return fmt.Errorf("error inserting event: %w", err)
}

rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("error getting rows affected: %s", err)
}
entryID.Int64, err = res.LastInsertId()
if err != nil {
return fmt.Errorf("could not get last insert id: %w", err)
}

eventsAffected += rowsAffected
rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("could not get rows affected: %w", err)
}
eventsAffected += rowsAffected

// backfill the event entries
for _, entry := range event.Entries {
// check if entry exists
var exists bool
err = stmtSelectEntry.QueryRow(
entryID.Int64,
isIndexedValue(entry.Flags),
[]byte{entry.Flags},
entry.Key,
entry.Codec,
entry.Value,
).Scan(&exists)
_, err := tx.Stmt(stmtEntry).Exec(
entryID.Int64, // event_id
isIndexedValue(entry.Flags), // indexed
[]byte{entry.Flags}, // flags
entry.Key, // key
entry.Codec, // codec
entry.Value, // value
)
if err != nil {
return fmt.Errorf("error checking if entry exists: %w", err)
return fmt.Errorf("error inserting entry: %w", err)
}

if !exists {
// entry does not exist, lets backfill it
res, err := stmtEntry.Exec(
entryID.Int64, // event_id
isIndexedValue(entry.Flags), // indexed
[]byte{entry.Flags}, // flags
entry.Key, // key
entry.Codec, // codec
entry.Value, // value
)
if err != nil {
return fmt.Errorf("error inserting entry: %w", err)
}

rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("error getting rows affected: %s", err)
}
entriesAffected += rowsAffected
rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("could not get rows affected: %w", err)
}
entriesAffected += rowsAffected
}
}
}

err = tx.Commit()
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", cnt, currTs.Height(), eventsAffected, entriesAffected)

totalEventsAffected += eventsAffected
totalEntriesAffected += entriesAffected

return nil
}

for i := 0; i < epochs; i++ {
select {
case <-ctx.Done():
return nil
default:
}

blockCid := prevTs.Blocks()[0].Cid()

// get messages for the parent of the previous tipset (which will be currTs)
msgs, err := api.ChainGetParentMessages(ctx, blockCid)
if err != nil {
return fmt.Errorf("failed to get parent messages for block %s: %w", blockCid, err)
}

// get receipts for the parent of the previous tipset (which will be currTs)
receipts, err := api.ChainGetParentReceipts(ctx, blockCid)
if err != nil {
return fmt.Errorf("failed to get parent receipts for block %s: %w", blockCid, err)
}

if len(msgs) != len(receipts) {
return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(msgs), len(receipts))
}

err = processHeight(ctx, i, msgs, receipts)
if err != nil {
return err
}

// advance prevTs and currTs up the chain
prevTs = currTs
currTs, err = api.ChainGetTipSet(ctx, currTs.Parents())
Expand All @@ -311,7 +319,7 @@ var backfillEventsCmd = &cli.Command{
}
}

log.Infof("backfilling events complete, eventsAffected:%d, entriesAffected:%d", eventsAffected, entriesAffected)
log.Infof("backfilling events complete, totalEventsAffected:%d, totalEntriesAffected:%d", totalEventsAffected, totalEntriesAffected)

return nil
},
Expand Down

0 comments on commit dafa837

Please sign in to comment.