Skip to content

Commit

Permalink
compare events AMT root (#12632)
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 authored Oct 23, 2024
1 parent 24698c5 commit 4b6a582
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 0 deletions.
104 changes: 104 additions & 0 deletions chain/index/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (
"errors"

"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"

bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
)

Expand Down Expand Up @@ -282,6 +286,32 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
return xerrors.Errorf("event entries count mismatch for height %d: chainstore has %d, index has %d", ts.Height(), totalEventEntriesCount, indexedData.nonRevertedEventEntriesCount)
}

// compare the events AMT root between the indexed events and the events in the chain state
for _, emsg := range executedMsgs {
indexedRoot, hasEvents, err := si.amtRootForEvents(ctx, tsKeyCid, emsg.msg.Cid())
if err != nil {
return xerrors.Errorf("failed to generate AMT root for indexed events of message %s at height %d: %w", emsg.msg.Cid(), ts.Height(), err)
}

if !hasEvents && emsg.rct.EventsRoot == nil {
// No events in index and no events in receipt, this is fine
continue
}

if hasEvents && emsg.rct.EventsRoot == nil {
return xerrors.Errorf("index corruption: events found in index for message %s at height %d, but message receipt has no events root", emsg.msg.Cid(), ts.Height())
}

if !hasEvents && emsg.rct.EventsRoot != nil {
return xerrors.Errorf("index corruption: no events found in index for message %s at height %d, but message receipt has events root %s", emsg.msg.Cid(), ts.Height(), emsg.rct.EventsRoot)
}

// Both index and receipt have events, compare the roots
if !indexedRoot.Equals(*emsg.rct.EventsRoot) {
return xerrors.Errorf("index corruption: events AMT root mismatch for message %s at height %d. Index root: %s, Receipt root: %s", emsg.msg.Cid(), ts.Height(), indexedRoot, emsg.rct.EventsRoot)
}
}

return nil
}

Expand Down Expand Up @@ -335,3 +365,77 @@ func (si *SqliteIndexer) getNextTipset(ctx context.Context, ts *types.TipSet) (*
func makeBackfillRequiredErr(height abi.ChainEpoch) error {
return xerrors.Errorf("missing tipset at height %d in the chain index, set backfill flag to true to fix", height)
}

// amtRootForEvents generates the events AMT root CID for a given message's events, and returns
// whether the message has events and a fatal error if one occurred.
func (si *SqliteIndexer) amtRootForEvents(
ctx context.Context,
tsKeyCid cid.Cid,
msgCid cid.Cid,
) (cid.Cid, bool, error) {
events := make([]cbg.CBORMarshaler, 0)

err := withTx(ctx, si.db, func(tx *sql.Tx) error {
rows, err := tx.Stmt(si.stmts.getEventIdAndEmitterIdStmt).QueryContext(ctx, tsKeyCid.Bytes(), msgCid.Bytes())
if err != nil {
return xerrors.Errorf("failed to query events: %w", err)
}
defer func() {
_ = rows.Close()
}()

for rows.Next() {
var eventId int
var actorId int64
if err := rows.Scan(&eventId, &actorId); err != nil {
return xerrors.Errorf("failed to scan row: %w", err)
}

event := types.Event{
Emitter: abi.ActorID(actorId),
Entries: make([]types.EventEntry, 0),
}

rows2, err := tx.Stmt(si.stmts.getEventEntriesStmt).QueryContext(ctx, eventId)
if err != nil {
return xerrors.Errorf("failed to query event entries: %w", err)
}
defer func() {
_ = rows2.Close()
}()

for rows2.Next() {
var flags []byte
var key string
var codec uint64
var value []byte
if err := rows2.Scan(&flags, &key, &codec, &value); err != nil {
return xerrors.Errorf("failed to scan row: %w", err)
}
entry := types.EventEntry{
Flags: flags[0],
Key: key,
Codec: codec,
Value: value,
}
event.Entries = append(event.Entries, entry)
}

events = append(events, &event)
}

return nil
})

if err != nil {
return cid.Undef, false, xerrors.Errorf("failed to retrieve events for message %s in tipset %s: %w", msgCid, tsKeyCid, err)
}

// construct the AMT from our slice to an in-memory IPLD store just so we can get the root,
// we don't need the blocks themselves
root, err := amt4.FromArray(ctx, cbor.NewCborStore(bstore.NewMemory()), events, amt4.UseTreeBitWidth(types.EventAMTBitwidth))
if err != nil {
return cid.Undef, false, xerrors.Errorf("failed to create AMT: %w", err)
}
return root, len(events) > 0, nil
}
23 changes: 23 additions & 0 deletions chain/index/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,13 @@ func TestBackfillMissingEpoch(t *testing.T) {
fakeMsg := fakeMessage(randomIDAddr(t, rng), randomIDAddr(t, rng))
fakeEvent := fakeEvent(1, []kv{{k: "test", v: []byte("value")}, {k: "test2", v: []byte("value2")}}, nil)

ec := randomCid(t, rng)
executedMsg := executedMessage{
msg: fakeMsg,
evs: []types.Event{*fakeEvent},
rct: types.MessageReceipt{
EventsRoot: &ec,
},
}

cs.SetMessagesForTipset(missingTs, []types.ChainMsg{fakeMsg})
Expand All @@ -214,7 +218,26 @@ func TestBackfillMissingEpoch(t *testing.T) {
require.Equal(t, uint64(2), result.IndexedEventEntriesCount)

// Verify that the epoch is now indexed
// fails as the events root dont match
verificationResult, err := si.ChainValidateIndex(ctx, missingEpoch, false)
require.ErrorContains(t, err, "events AMT root mismatch")
require.Nil(t, verificationResult)

tsKeyCid, err := missingTs.Key().Cid()
require.NoError(t, err)

root, b, err := si.amtRootForEvents(ctx, tsKeyCid, fakeMsg.Cid())
require.NoError(t, err)
require.True(t, b)
executedMsg.rct.EventsRoot = &root
si.setExecutedMessagesLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) {
if msgTs.Height() == missingTs.Height() {
return []executedMessage{executedMsg}, nil
}
return nil, nil
})

verificationResult, err = si.ChainValidateIndex(ctx, missingEpoch, false)
require.NoError(t, err)
require.NotNil(t, verificationResult)
require.False(t, verificationResult.Backfilled)
Expand Down
2 changes: 2 additions & 0 deletions chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,7 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.getMsgIdForMsgCidAndTipsetStmt: "SELECT id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0 LIMIT 1",
&ps.insertEventStmt: "INSERT INTO event (message_id, event_index, emitter_id, emitter_addr, reverted) VALUES (?, ?, ?, ?, ?)",
&ps.insertEventEntryStmt: "INSERT INTO event_entry (event_id, indexed, flags, key, codec, value) VALUES (?, ?, ?, ?, ?, ?)",
&ps.getEventEntriesStmt: "SELECT flags, key, codec, value FROM event_entry WHERE event_id=? ORDER BY _rowid_ ASC",
&ps.getEventIdAndEmitterIdStmt: "SELECT e.id, e.emitter_id FROM event e JOIN tipset_message tm ON e.message_id = tm.id WHERE tm.tipset_key_cid = ? AND tm.message_cid = ? ORDER BY e.event_index ASC",
}
}
123 changes: 123 additions & 0 deletions chain/index/ddls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,129 @@ func TestGetNonRevertedTipsetCountStmts(t *testing.T) {
verifyNonRevertedMessageCount(t, s, []byte(tipsetKeyCid1), 0)
}

func TestGetEventIdAndEmitterIdStmtAndGetEventEntriesStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)

// Insert a tipset message
tsKeyCid := []byte("test_tipset_key")
msgCid := []byte("test_message_cid")
messageID := insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: tsKeyCid,
height: 1,
reverted: false,
messageCid: msgCid,
messageIndex: 0,
})

// Insert events
event1ID := insertEvent(t, s, event{
messageID: messageID,
eventIndex: 0,
emitterId: 1,
emitterAddr: []byte("emitter_addr_1"),
reverted: false,
})
event2ID := insertEvent(t, s, event{
messageID: messageID,
eventIndex: 1,
emitterId: 2,
emitterAddr: []byte("emitter_addr_2"),
reverted: false,
})

// Insert event entries
insertEventEntry(t, s, eventEntry{
eventID: event1ID,
indexed: true,
flags: []byte{0x01},
key: "key1",
codec: 1,
value: []byte("value1"),
})
insertEventEntry(t, s, eventEntry{
eventID: event1ID,
indexed: false,
flags: []byte{0x00},
key: "key2",
codec: 2,
value: []byte("value2"),
})
insertEventEntry(t, s, eventEntry{
eventID: event2ID,
indexed: true,
flags: []byte{0x01},
key: "key3",
codec: 3,
value: []byte("value3"),
})

// Test getEventIdAndEmitterIdStmt
rows, err := s.stmts.getEventIdAndEmitterIdStmt.Query(tsKeyCid, msgCid)
require.NoError(t, err)
defer func() {
_ = rows.Close()
}()
var eventIDs []int64
var emitterIDs []uint64
for rows.Next() {
var eventID int64
var emitterID uint64
err := rows.Scan(&eventID, &emitterID)
require.NoError(t, err)
eventIDs = append(eventIDs, eventID)
emitterIDs = append(emitterIDs, emitterID)
}
require.NoError(t, rows.Err())
require.Equal(t, []int64{event1ID, event2ID}, eventIDs)
require.Equal(t, []uint64{1, 2}, emitterIDs)

// Test getEventEntriesStmt for event1
rows, err = s.stmts.getEventEntriesStmt.Query(event1ID)
require.NoError(t, err)
defer func() {
_ = rows.Close()
}()

var entries []eventEntry
for rows.Next() {
var entry eventEntry
err := rows.Scan(&entry.flags, &entry.key, &entry.codec, &entry.value)
require.NoError(t, err)
entries = append(entries, entry)
}
require.NoError(t, rows.Err())
require.Len(t, entries, 2)
require.Equal(t, []byte{0x01}, entries[0].flags)
require.Equal(t, "key1", entries[0].key)
require.Equal(t, 1, entries[0].codec)
require.Equal(t, []byte("value1"), entries[0].value)
require.Equal(t, []byte{0x00}, entries[1].flags)
require.Equal(t, "key2", entries[1].key)
require.Equal(t, 2, entries[1].codec)
require.Equal(t, []byte("value2"), entries[1].value)

// Test getEventEntriesStmt for event2
rows, err = s.stmts.getEventEntriesStmt.Query(event2ID)
require.NoError(t, err)
defer func() {
_ = rows.Close()
}()

entries = nil
for rows.Next() {
var entry eventEntry
err := rows.Scan(&entry.flags, &entry.key, &entry.codec, &entry.value)
require.NoError(t, err)
entries = append(entries, entry)
}
require.NoError(t, rows.Err())
require.Len(t, entries, 1)
require.Equal(t, []byte{0x01}, entries[0].flags)
require.Equal(t, "key3", entries[0].key)
require.Equal(t, 3, entries[0].codec)
require.Equal(t, []byte("value3"), entries[0].value)
}
func TestUpdateTipsetToNonRevertedStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions chain/index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type preparedStatements struct {
getMsgIdForMsgCidAndTipsetStmt *sql.Stmt
insertEventStmt *sql.Stmt
insertEventEntryStmt *sql.Stmt
getEventIdAndEmitterIdStmt *sql.Stmt
getEventEntriesStmt *sql.Stmt

hasNullRoundAtHeightStmt *sql.Stmt
getNonRevertedTipsetAtHeightStmt *sql.Stmt
Expand Down

0 comments on commit 4b6a582

Please sign in to comment.