From 7d9e21ea7b3bc847f5acd9c0192e6bc9ab29e57d Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 18 Jul 2024 21:00:02 +1000 Subject: [PATCH 1/2] fix: events: address sqlite index selection performance regressions Fixes: https://github.com/filecoin-project/lotus/issues/12255 SQLite was found to be avoiding the intended initial indexes for some types of complex queries and opting for minor indexes which don't narrow down the search space enough. Specifically we want queries to first either narrow by height or tipset_key_cid and then apply other criteria. Having alternative indexes when a query such as `height>=X AND height<=Y` are encountered cause SQLite to avoid the height index entirely. By removing additional indexes that could be used during the main query path (prefillFilter), we force SQLite to use the intended indexes and narrow the results without the use of indexes. --- CHANGELOG.md | 2 +- chain/events/filter/index.go | 100 ++++++++++++++++-------- chain/events/filter/index_migrations.go | 54 +++++++++---- 3 files changed, 107 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d1b9401f1fd..2c2ae81a51a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ ## ☢️ Upgrade Warnings ☢️ -- This Lotus release includes some correctness improvements to the events subsystem, impacting RPC APIs including `GetActorEventsRaw`, `SubscribeActorEventsRaw`, `eth_getLogs` and the `eth` filter APIs. Part of these improvements involve an events database migration that may take some time to complete on nodes with extensive event databases. See [filecoin-project/lotus#12080](https://github.com/filecoin-project/lotus/pull/12080) for details. +- This Lotus release includes some performance improvements to the events subsystem, impacting RPC APIs including `GetActorEventsRaw`, `SubscribeActorEventsRaw`, `eth_getLogs` and the `eth` filter APIs. Part of these improvements involve an events database migration that may take some time to complete on nodes with extensive event databases. See [filecoin-project/lotus#12261](https://github.com/filecoin-project/lotus/pull/12261) for details. - Breaking change in public APIs `storage/pipeline.NewPreCommitBatcher`, `sealing.NewCommitBatcher` and `storage/pipeline.New`. They now have an additional error return to deal with errors arising from fetching the sealing config. diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index cf533cec3e4..bec3888b16c 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -39,10 +39,8 @@ var ddls = []string{ reverted INTEGER NOT NULL )`, - createIndexEventEmitterAddr, createIndexEventTipsetKeyCid, createIndexEventHeight, - createIndexEventReverted, `CREATE TABLE IF NOT EXISTS event_entry ( event_id INTEGER, @@ -55,8 +53,6 @@ var ddls = []string{ createTableEventsSeen, - createIndexEventEntryIndexedKey, - createIndexEventEntryCodecValue, createIndexEventEntryEventId, createIndexEventsSeenHeight, createIndexEventsSeenTipsetKeyCid, @@ -67,7 +63,14 @@ var ( ) const ( - eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen ( + id INTEGER PRIMARY KEY, + height INTEGER NOT NULL, + tipset_key_cid BLOB NOT NULL, + reverted INTEGER NOT NULL, + UNIQUE(height, tipset_key_cid) + )` + insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` @@ -75,26 +78,53 @@ const ( revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?` restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?` upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` - isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?` - getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen` - isHeightProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?` - createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)` + eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + // QUERY PLAN + // `--SEARCH event USING INDEX event_height (height=?) + isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?` + // QUERY PLAN + // `--SEARCH events_seen USING COVERING INDEX events_seen_tipset_key_cid (tipset_key_cid=?) + getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen` + // QUERY PLAN + // `--SEARCH events_seen USING COVERING INDEX events_seen_height + isHeightProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?` + // QUERY PLAN + // `--SEARCH events_seen USING COVERING INDEX events_seen_height (height=?) + + // When modifying indexes in this file, it is critical to test the query plan (EXPLAIN QUERY PLAN) + // of all the variations of queries built by prefillFilter to ensure that the query first hits + // an index that narrows down results to an epoch or a reasonable range of epochs. Specifically, + // event_tipset_key_cid or event_height should be the first index. Then further narrowing can take + // place within the small subset of results. + // Unfortunately SQLite has some quirks in index selection that mean that certain query types will + // bypass these indexes if alternatives are available. This has been observed specifically on + // queries with height ranges: `height>=X AND height<=Y`. + // + // e.g. we want to see that `event_height` is the first index used in this query: + // + // EXPLAIN QUERY PLAN + // SELECT + // event.height, event.tipset_key_cid, event_entry.indexed, event_entry.codec, event_entry.key, event_entry.value + // FROM event + // JOIN + // event_entry ON event.id=event_entry.event_id, + // event_entry ee2 ON event.id=ee2.event_id + // WHERE event.height>=? AND event.height<=? AND event.reverted=? AND event.emitter_addr=? AND ee2.indexed=1 AND ee2.key=? + // ORDER BY event.height DESC, event_entry._rowid_ ASC + // + // -> + // + // QUERY PLAN + // |--SEARCH event USING INDEX event_height (height>? AND height= 0 { - clauses = append(clauses, "event.height>=?") + if f.minHeight >= 0 && f.minHeight == f.maxHeight { + clauses = append(clauses, "event.height=?") values = append(values, f.minHeight) - } - if f.maxHeight >= 0 { - clauses = append(clauses, "event.height<=?") - values = append(values, f.maxHeight) + } else { + if f.maxHeight >= 0 && f.minHeight >= 0 { + clauses = append(clauses, "event.height BETWEEN ? AND ?") + values = append(values, f.minHeight, f.maxHeight) + } else if f.minHeight >= 0 { + clauses = append(clauses, "event.height >= ?") + values = append(values, f.minHeight) + } else if f.maxHeight >= 0 { + clauses = append(clauses, "event.height <= ?") + values = append(values, f.maxHeight) + } } } @@ -496,12 +534,10 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude } if len(f.addresses) > 0 { - subclauses := make([]string, 0, len(f.addresses)) for _, addr := range f.addresses { - subclauses = append(subclauses, "emitter_addr=?") values = append(values, addr.Bytes()) } - clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") + clauses = append(clauses, "event.emitter_addr IN ("+strings.Repeat("?,", len(f.addresses)-1)+"?)") } if len(f.keysWithCodec) > 0 { @@ -510,7 +546,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude if len(vals) > 0 { join++ joinAlias := fmt.Sprintf("ee%d", join) - joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias)) + joins = append(joins, fmt.Sprintf("event_entry %s ON event.id=%[1]s.event_id", joinAlias)) clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias)) values = append(values, key) subclauses := make([]string, 0, len(vals)) diff --git a/chain/events/filter/index_migrations.go b/chain/events/filter/index_migrations.go index fe8a371a513..bf8fd2f943c 100644 --- a/chain/events/filter/index_migrations.go +++ b/chain/events/filter/index_migrations.go @@ -137,17 +137,10 @@ func migrationVersion2(db *sql.DB, chainStore *store.ChainStore) sqlite.Migratio // migrationVersion3 migrates the schema from version 2 to version 3 by creating two indices: // 1) an index on the event.emitter_addr column, and 2) an index on the event_entry.key column. +// +// As of version 7, these indices have been removed as they were found to be a performance +// hindrance. This migration is now a no-op. func migrationVersion3(ctx context.Context, tx *sql.Tx) error { - // create index on event.emitter_addr. - _, err := tx.ExecContext(ctx, createIndexEventEmitterAddr) - if err != nil { - return xerrors.Errorf("create index event_emitter_addr: %w", err) - } - - // original v3 migration introduced an index: - // CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key) - // which has subsequently been removed in v4, so it's omitted here - return nil } @@ -161,10 +154,13 @@ func migrationVersion3(ctx context.Context, tx *sql.Tx) error { // And then creating the following indices: // 1. an index on the event.tipset_key_cid column // 2. an index on the event.height column -// 3. an index on the event.reverted column -// 4. an index on the event_entry.indexed and event_entry.key columns -// 5. an index on the event_entry.codec and event_entry.value columns +// 3. an index on the event.reverted column (removed in version 7) +// 4. a composite index on the event_entry.indexed and event_entry.key columns (removed in version 7) +// 5. a composite index on the event_entry.codec and event_entry.value columns (removed in version 7) // 6. an index on the event_entry.event_id column +// +// Indexes 3, 4, and 5 were removed in version 7 as they were found to be a performance hindrance so +// are omitted here. func migrationVersion4(ctx context.Context, tx *sql.Tx) error { for _, create := range []struct { desc string @@ -174,9 +170,6 @@ func migrationVersion4(ctx context.Context, tx *sql.Tx) error { {"drop index event_entry_key_index", "DROP INDEX IF EXISTS event_entry_key_index;"}, {"create index event_tipset_key_cid", createIndexEventTipsetKeyCid}, {"create index event_height", createIndexEventHeight}, - {"create index event_reverted", createIndexEventReverted}, - {"create index event_entry_indexed_key", createIndexEventEntryIndexedKey}, - {"create index event_entry_codec_value", createIndexEventEntryCodecValue}, {"create index event_entry_event_id", createIndexEventEntryEventId}, } { if _, err := tx.ExecContext(ctx, create.query); err != nil { @@ -236,3 +229,32 @@ func migrationVersion6(ctx context.Context, tx *sql.Tx) error { return nil } + +// migrationVersion7 migrates the schema from version 6 to version 7 by dropping the following +// indices: +// 1. the index on the event.emitter_addr column +// 2. the index on the event.reverted column +// 3. the composite index on the event_entry.indexed and event_entry.key columns +// 4. the composite index on the event_entry.codec and event_entry.value columns +// +// These indices were found to be a performance hindrance as they prevent SQLite from using the +// intended initial indexes on height or tipset_key_cid in many query variations. Without additional +// indices to fall-back on, SQLite is forced to narrow down each query via height or tipset_key_cid +// which is the desired behavior. +func migrationVersion7(ctx context.Context, tx *sql.Tx) error { + for _, drop := range []struct { + desc string + query string + }{ + {"drop index event_emitter_addr", "DROP INDEX IF EXISTS event_emitter_addr;"}, + {"drop index event_reverted", "DROP INDEX IF EXISTS event_reverted;"}, + {"drop index event_entry_indexed_key", "DROP INDEX IF EXISTS event_entry_indexed_key;"}, + {"drop index event_entry_codec_value", "DROP INDEX IF EXISTS event_entry_codec_value;"}, + } { + if _, err := tx.ExecContext(ctx, drop.query); err != nil { + return xerrors.Errorf("%s: %w", drop.desc, err) + } + } + + return nil +} From 2a3337e0af49f554a2d08cef03a4f88dfd4a3516 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 19 Jul 2024 20:35:11 +1000 Subject: [PATCH 2/2] test: events: test sqlite indexes used in query plan of all selects and updates --- chain/events/filter/index.go | 335 +++++++++++++----------------- chain/events/filter/index_test.go | 87 ++++++++ 2 files changed, 237 insertions(+), 185 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index bec3888b16c..ff7f1aeaa7e 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -71,27 +71,6 @@ const ( UNIQUE(height, tipset_key_cid) )` - insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` - insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` - revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` - restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` - revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?` - restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?` - upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` - - eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` - // QUERY PLAN - // `--SEARCH event USING INDEX event_height (height=?) - isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?` - // QUERY PLAN - // `--SEARCH events_seen USING COVERING INDEX events_seen_tipset_key_cid (tipset_key_cid=?) - getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen` - // QUERY PLAN - // `--SEARCH events_seen USING COVERING INDEX events_seen_height - isHeightProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?` - // QUERY PLAN - // `--SEARCH events_seen USING COVERING INDEX events_seen_height (height=?) - // When modifying indexes in this file, it is critical to test the query plan (EXPLAIN QUERY PLAN) // of all the variations of queries built by prefillFilter to ensure that the query first hits // an index that narrows down results to an epoch or a reasonable range of epochs. Specifically, @@ -130,21 +109,45 @@ const ( createIndexEventsSeenTipsetKeyCid = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key_cid ON events_seen (tipset_key_cid);` ) +// preparedStatementMapping returns a map of fields of the preparedStatements struct to the SQL +// query that should be prepared for that field. This is used to prepare all the statements in +// the preparedStatements struct but it's also used by testing code to access the raw query strings +// to ensure that the correct indexes are being used by SELECT queries. +func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string { + return map[**sql.Stmt]string{ + &ps.insertEvent: `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`, + &ps.insertEntry: `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`, + &ps.revertEventsInTipset: `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`, + &ps.restoreEvent: `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`, + &ps.revertEventSeen: `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?`, + &ps.restoreEventSeen: `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?`, + &ps.upsertEventsSeen: `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`, + &ps.eventExists: `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`, // QUERY PLAN: SEARCH event USING INDEX event_height (height=?) + &ps.isTipsetProcessed: `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?`, // QUERY PLAN: SEARCH events_seen USING COVERING INDEX events_seen_tipset_key_cid (tipset_key_cid=?) + &ps.getMaxHeightInIndex: `SELECT MAX(height) FROM events_seen`, // QUERY PLAN: SEARCH events_seen USING COVERING INDEX events_seen_height + &ps.isHeightProcessed: `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?`, // QUERY PLAN: SEARCH events_seen USING COVERING INDEX events_seen_height (height=?) + + } +} + +type preparedStatements struct { + insertEvent *sql.Stmt + insertEntry *sql.Stmt + revertEventsInTipset *sql.Stmt + restoreEvent *sql.Stmt + upsertEventsSeen *sql.Stmt + revertEventSeen *sql.Stmt + restoreEventSeen *sql.Stmt + eventExists *sql.Stmt + isTipsetProcessed *sql.Stmt + getMaxHeightInIndex *sql.Stmt + isHeightProcessed *sql.Stmt +} + type EventIndex struct { db *sql.DB - stmtEventExists *sql.Stmt - stmtInsertEvent *sql.Stmt - stmtInsertEntry *sql.Stmt - stmtRevertEventsInTipset *sql.Stmt - stmtRestoreEvent *sql.Stmt - stmtUpsertEventsSeen *sql.Stmt - stmtRevertEventSeen *sql.Stmt - stmtRestoreEventSeen *sql.Stmt - - stmtIsTipsetProcessed *sql.Stmt - stmtGetMaxHeightInIndex *sql.Stmt - stmtIsHeightProcessed *sql.Stmt + stmt *preparedStatements mu sync.Mutex subIdCounter uint64 @@ -159,65 +162,6 @@ type updateSub struct { type EventIndexUpdated struct{} -func (ei *EventIndex) initStatements() (err error) { - ei.stmtEventExists, err = ei.db.Prepare(eventExists) - if err != nil { - return xerrors.Errorf("prepare stmtEventExists: %w", err) - } - - ei.stmtInsertEvent, err = ei.db.Prepare(insertEvent) - if err != nil { - return xerrors.Errorf("prepare stmtInsertEvent: %w", err) - } - - ei.stmtInsertEntry, err = ei.db.Prepare(insertEntry) - if err != nil { - return xerrors.Errorf("prepare stmtInsertEntry: %w", err) - } - - ei.stmtRevertEventsInTipset, err = ei.db.Prepare(revertEventsInTipset) - if err != nil { - return xerrors.Errorf("prepare stmtRevertEventsInTipset: %w", err) - } - - ei.stmtRestoreEvent, err = ei.db.Prepare(restoreEvent) - if err != nil { - return xerrors.Errorf("prepare stmtRestoreEvent: %w", err) - } - - ei.stmtUpsertEventsSeen, err = ei.db.Prepare(upsertEventsSeen) - if err != nil { - return xerrors.Errorf("prepare stmtUpsertEventsSeen: %w", err) - } - - ei.stmtRevertEventSeen, err = ei.db.Prepare(revertEventSeen) - if err != nil { - return xerrors.Errorf("prepare stmtRevertEventSeen: %w", err) - } - - ei.stmtRestoreEventSeen, err = ei.db.Prepare(restoreEventSeen) - if err != nil { - return xerrors.Errorf("prepare stmtRestoreEventSeen: %w", err) - } - - ei.stmtIsTipsetProcessed, err = ei.db.Prepare(isTipsetProcessed) - if err != nil { - return xerrors.Errorf("prepare isTipsetProcessed: %w", err) - } - - ei.stmtGetMaxHeightInIndex, err = ei.db.Prepare(getMaxHeightInIndex) - if err != nil { - return xerrors.Errorf("prepare getMaxHeightInIndex: %w", err) - } - - ei.stmtIsHeightProcessed, err = ei.db.Prepare(isHeightProcessed) - if err != nil { - return xerrors.Errorf("prepare isHeightProcessed: %w", err) - } - - return nil -} - func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStore) (*EventIndex, error) { db, _, err := sqlite.Open(path) if err != nil { @@ -237,7 +181,10 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor return nil, xerrors.Errorf("failed to setup event index db: %w", err) } - eventIndex := EventIndex{db: db} + eventIndex := EventIndex{ + db: db, + stmt: &preparedStatements{}, + } if err = eventIndex.initStatements(); err != nil { _ = db.Close() @@ -249,6 +196,19 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor return &eventIndex, nil } +func (ei *EventIndex) initStatements() error { + stmtMapping := preparedStatementMapping(ei.stmt) + for stmtPointer, query := range stmtMapping { + var err error + *stmtPointer, err = ei.db.Prepare(query) + if err != nil { + return xerrors.Errorf("prepare statement [%s]: %w", query, err) + } + } + + return nil +} + func (ei *EventIndex) Close() error { if ei.db == nil { return nil @@ -290,7 +250,7 @@ func (ei *EventIndex) SubscribeUpdates() (chan EventIndexUpdated, func()) { } func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) { - row := ei.stmtGetMaxHeightInIndex.QueryRowContext(ctx) + row := ei.stmt.getMaxHeightInIndex.QueryRowContext(ctx) var maxHeight uint64 err := row.Scan(&maxHeight) return maxHeight, err @@ -305,7 +265,7 @@ func (ei *EventIndex) IsHeightPast(ctx context.Context, height uint64) (bool, er } func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) { - row := ei.stmtIsTipsetProcessed.QueryRowContext(ctx, tipsetKeyCid) + row := ei.stmt.isTipsetProcessed.QueryRowContext(ctx, tipsetKeyCid) var exists bool err := row.Scan(&exists) return exists, err @@ -326,12 +286,12 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // lets handle the revert case first, since its simpler and we can simply mark all events in this tipset as reverted and return if revert { - _, err = tx.Stmt(ei.stmtRevertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes()) + _, err = tx.Stmt(ei.stmt.revertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes()) if err != nil { return xerrors.Errorf("revert event: %w", err) } - _, err = tx.Stmt(ei.stmtRevertEventSeen).Exec(te.msgTs.Height(), tsKeyCid.Bytes()) + _, err = tx.Stmt(ei.stmt.revertEventSeen).Exec(te.msgTs.Height(), tsKeyCid.Bytes()) if err != nil { return xerrors.Errorf("revert event seen: %w", err) } @@ -388,7 +348,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // check if this event already exists in the database var entryID sql.NullInt64 - err = tx.Stmt(ei.stmtEventExists).QueryRow( + err = tx.Stmt(ei.stmt.eventExists).QueryRow( te.msgTs.Height(), // height te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid @@ -403,7 +363,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever if !entryID.Valid { // event does not exist, lets insert it - res, err := tx.Stmt(ei.stmtInsertEvent).Exec( + res, err := tx.Stmt(ei.stmt.insertEvent).Exec( te.msgTs.Height(), // height te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid @@ -424,7 +384,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // insert all the entries for this event for _, entry := range ev.Entries { - _, err = tx.Stmt(ei.stmtInsertEntry).Exec( + _, err = tx.Stmt(ei.stmt.insertEntry).Exec( entryID.Int64, // event_id isIndexedValue(entry.Flags), // indexed []byte{entry.Flags}, // flags @@ -438,7 +398,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } } else { // event already exists, lets mark it as not reverted - res, err := tx.Stmt(ei.stmtRestoreEvent).Exec( + res, err := tx.Stmt(ei.stmt.restoreEvent).Exec( te.msgTs.Height(), // height te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid @@ -467,7 +427,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // this statement will mark the tipset as processed and will insert a new row if it doesn't exist // or update the reverted field to false if it does - _, err = tx.Stmt(ei.stmtUpsertEventsSeen).Exec( + _, err = tx.Stmt(ei.stmt.upsertEventsSeen).Exec( te.msgTs.Height(), tsKeyCid.Bytes(), ) @@ -503,90 +463,9 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // prefillFilter fills a filter's collection of events from the historic index func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error { - clauses := []string{} - values := []any{} - joins := []string{} - - if f.tipsetCid != cid.Undef { - clauses = append(clauses, "event.tipset_key_cid=?") - values = append(values, f.tipsetCid.Bytes()) - } else { - if f.minHeight >= 0 && f.minHeight == f.maxHeight { - clauses = append(clauses, "event.height=?") - values = append(values, f.minHeight) - } else { - if f.maxHeight >= 0 && f.minHeight >= 0 { - clauses = append(clauses, "event.height BETWEEN ? AND ?") - values = append(values, f.minHeight, f.maxHeight) - } else if f.minHeight >= 0 { - clauses = append(clauses, "event.height >= ?") - values = append(values, f.minHeight) - } else if f.maxHeight >= 0 { - clauses = append(clauses, "event.height <= ?") - values = append(values, f.maxHeight) - } - } - } - - if excludeReverted { - clauses = append(clauses, "event.reverted=?") - values = append(values, false) - } - - if len(f.addresses) > 0 { - for _, addr := range f.addresses { - values = append(values, addr.Bytes()) - } - clauses = append(clauses, "event.emitter_addr IN ("+strings.Repeat("?,", len(f.addresses)-1)+"?)") - } - - if len(f.keysWithCodec) > 0 { - join := 0 - for key, vals := range f.keysWithCodec { - if len(vals) > 0 { - join++ - joinAlias := fmt.Sprintf("ee%d", join) - joins = append(joins, fmt.Sprintf("event_entry %s ON event.id=%[1]s.event_id", joinAlias)) - clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias)) - values = append(values, key) - subclauses := make([]string, 0, len(vals)) - for _, val := range vals { - subclauses = append(subclauses, fmt.Sprintf("(%s.value=? AND %[1]s.codec=?)", joinAlias)) - values = append(values, val.Value, val.Codec) - } - clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") - } - } - } + values, query := makePrefillFilterQuery(f, excludeReverted) - s := `SELECT - event.id, - event.height, - event.tipset_key, - event.tipset_key_cid, - event.emitter_addr, - event.event_index, - event.message_cid, - event.message_index, - event.reverted, - event_entry.flags, - event_entry.key, - event_entry.codec, - event_entry.value - FROM event JOIN event_entry ON event.id=event_entry.event_id` - - if len(joins) > 0 { - s = s + ", " + strings.Join(joins, ", ") - } - - if len(clauses) > 0 { - s = s + " WHERE " + strings.Join(clauses, " AND ") - } - - // retain insertion order of event_entry rows with the implicit _rowid_ column - s += " ORDER BY event.height DESC, event_entry._rowid_ ASC" - - stmt, err := ei.db.Prepare(s) + stmt, err := ei.db.Prepare(query) if err != nil { return xerrors.Errorf("prepare prefill query: %w", err) } @@ -705,3 +584,89 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude return nil } + +func makePrefillFilterQuery(f *eventFilter, excludeReverted bool) ([]any, string) { + clauses := []string{} + values := []any{} + joins := []string{} + + if f.tipsetCid != cid.Undef { + clauses = append(clauses, "event.tipset_key_cid=?") + values = append(values, f.tipsetCid.Bytes()) + } else { + if f.minHeight >= 0 && f.minHeight == f.maxHeight { + clauses = append(clauses, "event.height=?") + values = append(values, f.minHeight) + } else { + if f.maxHeight >= 0 && f.minHeight >= 0 { + clauses = append(clauses, "event.height BETWEEN ? AND ?") + values = append(values, f.minHeight, f.maxHeight) + } else if f.minHeight >= 0 { + clauses = append(clauses, "event.height >= ?") + values = append(values, f.minHeight) + } else if f.maxHeight >= 0 { + clauses = append(clauses, "event.height <= ?") + values = append(values, f.maxHeight) + } + } + } + + if excludeReverted { + clauses = append(clauses, "event.reverted=?") + values = append(values, false) + } + + if len(f.addresses) > 0 { + for _, addr := range f.addresses { + values = append(values, addr.Bytes()) + } + clauses = append(clauses, "event.emitter_addr IN ("+strings.Repeat("?,", len(f.addresses)-1)+"?)") + } + + if len(f.keysWithCodec) > 0 { + join := 0 + for key, vals := range f.keysWithCodec { + if len(vals) > 0 { + join++ + joinAlias := fmt.Sprintf("ee%d", join) + joins = append(joins, fmt.Sprintf("event_entry %s ON event.id=%[1]s.event_id", joinAlias)) + clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias)) + values = append(values, key) + subclauses := make([]string, 0, len(vals)) + for _, val := range vals { + subclauses = append(subclauses, fmt.Sprintf("(%s.value=? AND %[1]s.codec=?)", joinAlias)) + values = append(values, val.Value, val.Codec) + } + clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") + } + } + } + + s := `SELECT + event.id, + event.height, + event.tipset_key, + event.tipset_key_cid, + event.emitter_addr, + event.event_index, + event.message_cid, + event.message_index, + event.reverted, + event_entry.flags, + event_entry.key, + event_entry.codec, + event_entry.value + FROM event JOIN event_entry ON event.id=event_entry.event_id` + + if len(joins) > 0 { + s = s + ", " + strings.Join(joins, ", ") + } + + if len(clauses) > 0 { + s = s + " WHERE " + strings.Join(clauses, " AND ") + } + + // retain insertion order of event_entry rows with the implicit _rowid_ column + s += " ORDER BY event.height DESC, event_entry._rowid_ ASC" + return values, s +} diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index ad08da5aedb..008b5697130 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -5,8 +5,11 @@ import ( pseudo "math/rand" "os" "path/filepath" + "regexp" + "strings" "testing" + "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" @@ -957,3 +960,87 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }) } } + +// TestQueryPlan is to ensure that future modifications to the db schema, or future upgrades to +// sqlite, do not change the query plan of the prepared statements used by the event index such that +// queries hit undesirable indexes which are likely to slow down the query. +// Changes that break this test need to be sure that the query plan is still efficient for the +// expected query patterns. +func TestQueryPlan(t *testing.T) { + ei, err := NewEventIndex(context.Background(), filepath.Join(t.TempDir(), "actorevents.db"), nil) + require.NoError(t, err, "create event index") + + verifyQueryPlan := func(stmt string) { + rows, err := ei.db.Query("EXPLAIN QUERY PLAN " + strings.Replace(stmt, "?", "1", -1)) + require.NoError(t, err, "explain query plan for query: "+stmt) + defer func() { + require.NoError(t, rows.Close()) + }() + // First response to EXPLAIN QUERY PLAN should show us the use of an index that we want to + // encounter first to narrow down the search space - either a height or tipset_key_cid index + // - sqlite_autoindex_events_seen_1 is for the UNIQUE constraint on events_seen + // - events_seen_height and events_seen_tipset_key_cid are explicit indexes on events_seen + // - event_height and event_tipset_key_cid are explicit indexes on event + rows.Next() + var id, parent, notused, detail string + require.NoError(t, rows.Scan(&id, &parent, ¬used, &detail), "scan explain query plan for query: "+stmt) + detail = strings.TrimSpace(detail) + var expectedIndexes = []string{ + "sqlite_autoindex_events_seen_1", + "events_seen_height", + "events_seen_tipset_key_cid", + "event_height", + "event_tipset_key_cid", + } + indexUsed := false + for _, index := range expectedIndexes { + if strings.Contains(detail, " INDEX "+index) { + indexUsed = true + break + } + } + require.True(t, indexUsed, "index used for query: "+stmt+" detail: "+detail) + + stmt = regexp.MustCompile(`(?m)^\s+`).ReplaceAllString(stmt, " ") // remove all leading whitespace from the statement + stmt = strings.Replace(stmt, "\n", "", -1) // remove all newlines from the statement + t.Logf("[%s] has plan start: %s", stmt, detail) + } + + // Test the hard-coded select and update queries + stmtMap := preparedStatementMapping(&preparedStatements{}) + for _, stmt := range stmtMap { + if strings.HasPrefix(strings.TrimSpace(strings.ToLower(stmt)), "insert") { + continue + } + verifyQueryPlan(stmt) + } + + // Test the dynamic prefillFilter queries + prefillCases := []*eventFilter{ + {}, + {minHeight: 14000, maxHeight: 14000}, + {minHeight: 14000, maxHeight: 15000}, + {tipsetCid: cid.MustParse("bafkqaaa")}, + {minHeight: 14000, maxHeight: 14000, addresses: []address.Address{address.TestAddress}}, + {minHeight: 14000, maxHeight: 15000, addresses: []address.Address{address.TestAddress}}, + {tipsetCid: cid.MustParse("bafkqaaa"), addresses: []address.Address{address.TestAddress}}, + {minHeight: 14000, maxHeight: 14000, addresses: []address.Address{address.TestAddress, address.TestAddress}}, + {minHeight: 14000, maxHeight: 15000, addresses: []address.Address{address.TestAddress, address.TestAddress}}, + {tipsetCid: cid.MustParse("bafkqaaa"), addresses: []address.Address{address.TestAddress, address.TestAddress}}, + {minHeight: 14000, maxHeight: 14000, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{"type": {[]byte("approval")}})}, + {minHeight: 14000, maxHeight: 15000, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{"type": {[]byte("approval")}})}, + {tipsetCid: cid.MustParse("bafkqaaa"), keysWithCodec: keysToKeysWithCodec(map[string][][]byte{"type": {[]byte("approval")}})}, + {minHeight: 14000, maxHeight: 14000, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{"type": {[]byte("approval")}, "signer": {[]byte("addr1")}})}, + {minHeight: 14000, maxHeight: 15000, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{"type": {[]byte("approval")}, "signer": {[]byte("addr1")}})}, + {tipsetCid: cid.MustParse("bafkqaaa"), keysWithCodec: keysToKeysWithCodec(map[string][][]byte{"type": {[]byte("approval")}, "signer": {[]byte("addr1")}})}, + {minHeight: 14000, maxHeight: 14000, addresses: []address.Address{address.TestAddress, address.TestAddress}, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{"type": {[]byte("approval")}, "signer": {[]byte("addr1")}})}, + {minHeight: 14000, maxHeight: 15000, addresses: []address.Address{address.TestAddress, address.TestAddress}, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{"type": {[]byte("approval")}, "signer": {[]byte("addr1")}})}, + {tipsetCid: cid.MustParse("bafkqaaa"), addresses: []address.Address{address.TestAddress, address.TestAddress}, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{"type": {[]byte("approval")}, "signer": {[]byte("addr1")}})}, + } + for _, filter := range prefillCases { + _, query := makePrefillFilterQuery(filter, true) + verifyQueryPlan(query) + _, query = makePrefillFilterQuery(filter, false) + verifyQueryPlan(query) + } +}