diff --git a/chain/index/api.go b/chain/index/api.go index 9588383ea1..5262a20032 100644 --- a/chain/index/api.go +++ b/chain/index/api.go @@ -6,11 +6,15 @@ import ( "errors" "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" + cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" + amt4 "github.com/filecoin-project/go-amt-ipld/v4" "github.com/filecoin-project/go-state-types/abi" + bstore "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/types" ) @@ -282,6 +286,32 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet return xerrors.Errorf("event entries count mismatch for height %d: chainstore has %d, index has %d", ts.Height(), totalEventEntriesCount, indexedData.nonRevertedEventEntriesCount) } + // compare the events AMT root between the indexed events and the events in the chain state + for _, emsg := range executedMsgs { + indexedRoot, hasEvents, err := si.amtRootForEvents(ctx, tsKeyCid, emsg.msg.Cid()) + if err != nil { + return xerrors.Errorf("failed to generate AMT root for indexed events of message %s at height %d: %w", emsg.msg.Cid(), ts.Height(), err) + } + + if !hasEvents && emsg.rct.EventsRoot == nil { + // No events in index and no events in receipt, this is fine + continue + } + + if hasEvents && emsg.rct.EventsRoot == nil { + return xerrors.Errorf("index corruption: events found in index for message %s at height %d, but message receipt has no events root", emsg.msg.Cid(), ts.Height()) + } + + if !hasEvents && emsg.rct.EventsRoot != nil { + return xerrors.Errorf("index corruption: no events found in index for message %s at height %d, but message receipt has events root %s", emsg.msg.Cid(), ts.Height(), emsg.rct.EventsRoot) + } + + // Both index and receipt have events, compare the roots + if !indexedRoot.Equals(*emsg.rct.EventsRoot) { + return xerrors.Errorf("index corruption: events AMT root mismatch for message %s at height %d. Index root: %s, Receipt root: %s", emsg.msg.Cid(), ts.Height(), indexedRoot, emsg.rct.EventsRoot) + } + } + return nil } @@ -335,3 +365,77 @@ func (si *SqliteIndexer) getNextTipset(ctx context.Context, ts *types.TipSet) (* func makeBackfillRequiredErr(height abi.ChainEpoch) error { return xerrors.Errorf("missing tipset at height %d in the chain index, set backfill flag to true to fix", height) } + +// amtRootForEvents generates the events AMT root CID for a given message's events, and returns +// whether the message has events and a fatal error if one occurred. +func (si *SqliteIndexer) amtRootForEvents( + ctx context.Context, + tsKeyCid cid.Cid, + msgCid cid.Cid, +) (cid.Cid, bool, error) { + events := make([]cbg.CBORMarshaler, 0) + + err := withTx(ctx, si.db, func(tx *sql.Tx) error { + rows, err := tx.Stmt(si.stmts.getEventIdAndEmitterIdStmt).QueryContext(ctx, tsKeyCid.Bytes(), msgCid.Bytes()) + if err != nil { + return xerrors.Errorf("failed to query events: %w", err) + } + defer func() { + _ = rows.Close() + }() + + for rows.Next() { + var eventId int + var actorId int64 + if err := rows.Scan(&eventId, &actorId); err != nil { + return xerrors.Errorf("failed to scan row: %w", err) + } + + event := types.Event{ + Emitter: abi.ActorID(actorId), + Entries: make([]types.EventEntry, 0), + } + + rows2, err := tx.Stmt(si.stmts.getEventEntriesStmt).QueryContext(ctx, eventId) + if err != nil { + return xerrors.Errorf("failed to query event entries: %w", err) + } + defer func() { + _ = rows2.Close() + }() + + for rows2.Next() { + var flags []byte + var key string + var codec uint64 + var value []byte + if err := rows2.Scan(&flags, &key, &codec, &value); err != nil { + return xerrors.Errorf("failed to scan row: %w", err) + } + entry := types.EventEntry{ + Flags: flags[0], + Key: key, + Codec: codec, + Value: value, + } + event.Entries = append(event.Entries, entry) + } + + events = append(events, &event) + } + + return nil + }) + + if err != nil { + return cid.Undef, false, xerrors.Errorf("failed to retrieve events for message %s in tipset %s: %w", msgCid, tsKeyCid, err) + } + + // construct the AMT from our slice to an in-memory IPLD store just so we can get the root, + // we don't need the blocks themselves + root, err := amt4.FromArray(ctx, cbor.NewCborStore(bstore.NewMemory()), events, amt4.UseTreeBitWidth(types.EventAMTBitwidth)) + if err != nil { + return cid.Undef, false, xerrors.Errorf("failed to create AMT: %w", err) + } + return root, len(events) > 0, nil +} diff --git a/chain/index/api_test.go b/chain/index/api_test.go index f8c7085cd2..93af0eeee9 100644 --- a/chain/index/api_test.go +++ b/chain/index/api_test.go @@ -190,9 +190,13 @@ func TestBackfillMissingEpoch(t *testing.T) { fakeMsg := fakeMessage(randomIDAddr(t, rng), randomIDAddr(t, rng)) fakeEvent := fakeEvent(1, []kv{{k: "test", v: []byte("value")}, {k: "test2", v: []byte("value2")}}, nil) + ec := randomCid(t, rng) executedMsg := executedMessage{ msg: fakeMsg, evs: []types.Event{*fakeEvent}, + rct: types.MessageReceipt{ + EventsRoot: &ec, + }, } cs.SetMessagesForTipset(missingTs, []types.ChainMsg{fakeMsg}) @@ -214,7 +218,26 @@ func TestBackfillMissingEpoch(t *testing.T) { require.Equal(t, uint64(2), result.IndexedEventEntriesCount) // Verify that the epoch is now indexed + // fails as the events root dont match verificationResult, err := si.ChainValidateIndex(ctx, missingEpoch, false) + require.ErrorContains(t, err, "events AMT root mismatch") + require.Nil(t, verificationResult) + + tsKeyCid, err := missingTs.Key().Cid() + require.NoError(t, err) + + root, b, err := si.amtRootForEvents(ctx, tsKeyCid, fakeMsg.Cid()) + require.NoError(t, err) + require.True(t, b) + executedMsg.rct.EventsRoot = &root + si.setExecutedMessagesLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + if msgTs.Height() == missingTs.Height() { + return []executedMessage{executedMsg}, nil + } + return nil, nil + }) + + verificationResult, err = si.ChainValidateIndex(ctx, missingEpoch, false) require.NoError(t, err) require.NotNil(t, verificationResult) require.False(t, verificationResult.Backfilled) diff --git a/chain/index/ddls.go b/chain/index/ddls.go index 6a4f0d4034..056f45c592 100644 --- a/chain/index/ddls.go +++ b/chain/index/ddls.go @@ -105,5 +105,7 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string { &ps.getMsgIdForMsgCidAndTipsetStmt: "SELECT id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0 LIMIT 1", &ps.insertEventStmt: "INSERT INTO event (message_id, event_index, emitter_id, emitter_addr, reverted) VALUES (?, ?, ?, ?, ?)", &ps.insertEventEntryStmt: "INSERT INTO event_entry (event_id, indexed, flags, key, codec, value) VALUES (?, ?, ?, ?, ?, ?)", + &ps.getEventEntriesStmt: "SELECT flags, key, codec, value FROM event_entry WHERE event_id=? ORDER BY _rowid_ ASC", + &ps.getEventIdAndEmitterIdStmt: "SELECT e.id, e.emitter_id FROM event e JOIN tipset_message tm ON e.message_id = tm.id WHERE tm.tipset_key_cid = ? AND tm.message_cid = ? ORDER BY e.event_index ASC", } } diff --git a/chain/index/ddls_test.go b/chain/index/ddls_test.go index e9673a440f..e71db1a8cd 100644 --- a/chain/index/ddls_test.go +++ b/chain/index/ddls_test.go @@ -144,6 +144,129 @@ func TestGetNonRevertedTipsetCountStmts(t *testing.T) { verifyNonRevertedMessageCount(t, s, []byte(tipsetKeyCid1), 0) } +func TestGetEventIdAndEmitterIdStmtAndGetEventEntriesStmt(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // Insert a tipset message + tsKeyCid := []byte("test_tipset_key") + msgCid := []byte("test_message_cid") + messageID := insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: tsKeyCid, + height: 1, + reverted: false, + messageCid: msgCid, + messageIndex: 0, + }) + + // Insert events + event1ID := insertEvent(t, s, event{ + messageID: messageID, + eventIndex: 0, + emitterId: 1, + emitterAddr: []byte("emitter_addr_1"), + reverted: false, + }) + event2ID := insertEvent(t, s, event{ + messageID: messageID, + eventIndex: 1, + emitterId: 2, + emitterAddr: []byte("emitter_addr_2"), + reverted: false, + }) + + // Insert event entries + insertEventEntry(t, s, eventEntry{ + eventID: event1ID, + indexed: true, + flags: []byte{0x01}, + key: "key1", + codec: 1, + value: []byte("value1"), + }) + insertEventEntry(t, s, eventEntry{ + eventID: event1ID, + indexed: false, + flags: []byte{0x00}, + key: "key2", + codec: 2, + value: []byte("value2"), + }) + insertEventEntry(t, s, eventEntry{ + eventID: event2ID, + indexed: true, + flags: []byte{0x01}, + key: "key3", + codec: 3, + value: []byte("value3"), + }) + + // Test getEventIdAndEmitterIdStmt + rows, err := s.stmts.getEventIdAndEmitterIdStmt.Query(tsKeyCid, msgCid) + require.NoError(t, err) + defer func() { + _ = rows.Close() + }() + var eventIDs []int64 + var emitterIDs []uint64 + for rows.Next() { + var eventID int64 + var emitterID uint64 + err := rows.Scan(&eventID, &emitterID) + require.NoError(t, err) + eventIDs = append(eventIDs, eventID) + emitterIDs = append(emitterIDs, emitterID) + } + require.NoError(t, rows.Err()) + require.Equal(t, []int64{event1ID, event2ID}, eventIDs) + require.Equal(t, []uint64{1, 2}, emitterIDs) + + // Test getEventEntriesStmt for event1 + rows, err = s.stmts.getEventEntriesStmt.Query(event1ID) + require.NoError(t, err) + defer func() { + _ = rows.Close() + }() + + var entries []eventEntry + for rows.Next() { + var entry eventEntry + err := rows.Scan(&entry.flags, &entry.key, &entry.codec, &entry.value) + require.NoError(t, err) + entries = append(entries, entry) + } + require.NoError(t, rows.Err()) + require.Len(t, entries, 2) + require.Equal(t, []byte{0x01}, entries[0].flags) + require.Equal(t, "key1", entries[0].key) + require.Equal(t, 1, entries[0].codec) + require.Equal(t, []byte("value1"), entries[0].value) + require.Equal(t, []byte{0x00}, entries[1].flags) + require.Equal(t, "key2", entries[1].key) + require.Equal(t, 2, entries[1].codec) + require.Equal(t, []byte("value2"), entries[1].value) + + // Test getEventEntriesStmt for event2 + rows, err = s.stmts.getEventEntriesStmt.Query(event2ID) + require.NoError(t, err) + defer func() { + _ = rows.Close() + }() + + entries = nil + for rows.Next() { + var entry eventEntry + err := rows.Scan(&entry.flags, &entry.key, &entry.codec, &entry.value) + require.NoError(t, err) + entries = append(entries, entry) + } + require.NoError(t, rows.Err()) + require.Len(t, entries, 1) + require.Equal(t, []byte{0x01}, entries[0].flags) + require.Equal(t, "key3", entries[0].key) + require.Equal(t, 3, entries[0].codec) + require.Equal(t, []byte("value3"), entries[0].value) +} func TestUpdateTipsetToNonRevertedStmt(t *testing.T) { s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) require.NoError(t, err) diff --git a/chain/index/indexer.go b/chain/index/indexer.go index 5d5e6fbca9..10ea4fc9c8 100644 --- a/chain/index/indexer.go +++ b/chain/index/indexer.go @@ -45,6 +45,8 @@ type preparedStatements struct { getMsgIdForMsgCidAndTipsetStmt *sql.Stmt insertEventStmt *sql.Stmt insertEventEntryStmt *sql.Stmt + getEventIdAndEmitterIdStmt *sql.Stmt + getEventEntriesStmt *sql.Stmt hasNullRoundAtHeightStmt *sql.Stmt getNonRevertedTipsetAtHeightStmt *sql.Stmt