From b9128f2a39c407149b26e1c8da2f427a883b2aeb Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 12 Jun 2024 15:00:20 +0400 Subject: [PATCH 01/20] record seen event epochs --- chain/events/filter/index.go | 112 ++++++++++++++++++++++++++++-- chain/events/filter/index_test.go | 5 ++ 2 files changed, 111 insertions(+), 6 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 98cc54d2029..4e308fe0a83 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -61,9 +61,13 @@ var ddls = []string{ value BLOB NOT NULL )`, + createTableEventsSeen, + createIndexEventEntryIndexedKey, createIndexEventEntryCodecValue, createIndexEventEntryEventId, + createIndexEventsSeenHeight, + createIndexEventsSeenTipsetKeyCid, // metadata containing version of schema `CREATE TABLE IF NOT EXISTS _meta ( @@ -75,6 +79,7 @@ var ddls = []string{ `INSERT OR IGNORE INTO _meta (version) VALUES (3)`, `INSERT OR IGNORE INTO _meta (version) VALUES (4)`, `INSERT OR IGNORE INTO _meta (version) VALUES (5)`, + `INSERT OR IGNORE INTO _meta (version) VALUES (6)`, } var ( @@ -82,13 +87,14 @@ var ( ) const ( - schemaVersion = 5 + schemaVersion = 6 eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + insertEventsSeen = `INSERT OR IGNORE INTO events_seen(height, tipset_key, tipset_key_cid) VALUES(?, ?, ?)` createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)` createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);` @@ -98,6 +104,16 @@ const ( createIndexEventEntryIndexedKey = `CREATE INDEX IF NOT EXISTS event_entry_indexed_key ON event_entry (indexed, key);` createIndexEventEntryCodecValue = `CREATE INDEX IF NOT EXISTS event_entry_codec_value ON event_entry (codec, value);` createIndexEventEntryEventId = `CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);` + + createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen ( + id INTEGER PRIMARY KEY, + height INTEGER NOT NULL, + tipset_key BLOB NOT NULL, + tipset_key_cid BLOB NOT NULL + )` + + createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);` + createIndexEventsSeenTipsetKeyCid = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key_cid ON events_seen (tipset_key_cid);` ) type EventIndex struct { @@ -108,6 +124,7 @@ type EventIndex struct { stmtInsertEntry *sql.Stmt stmtRevertEventsInTipset *sql.Stmt stmtRestoreEvent *sql.Stmt + stmtInsertEventsSeen *sql.Stmt } func (ei *EventIndex) initStatements() (err error) { @@ -136,6 +153,11 @@ func (ei *EventIndex) initStatements() (err error) { return xerrors.Errorf("prepare stmtRestoreEvent: %w", err) } + ei.stmtInsertEventsSeen, err = ei.db.Prepare(insertEventsSeen) + if err != nil { + return xerrors.Errorf("prepare stmtInsertEventsSeen: %w", err) + } + return nil } @@ -407,6 +429,58 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context) error { return nil } +func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { + now := time.Now() + + tx, err := ei.db.BeginTx(ctx, nil) + if err != nil { + return xerrors.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + + stmtCreateTableEventsSeen, err := tx.PrepareContext(ctx, createTableEventsSeen) + if err != nil { + return xerrors.Errorf("prepare stmtCreateTableEventsSeen: %w", err) + } + _, err = stmtCreateTableEventsSeen.ExecContext(ctx) + if err != nil { + return xerrors.Errorf("create table events_seen: %w", err) + } + + _, err = tx.ExecContext(ctx, createIndexEventsSeenHeight) + if err != nil { + return xerrors.Errorf("create index events_seen_height: %w", err) + } + _, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKeyCid) + if err != nil { + return xerrors.Errorf("create index events_seen_tipset_key_cid: %w", err) + } + + // INSERT an entry in the events_seen table for all epochs we do have have events for in our DB + _, err = tx.ExecContext(ctx, ` + INSERT INTO events_seen (height, tipset_key, tipset_key_cid) + SELECT e.height, e.tipset_key, e.tipset_key_cid + FROM event e + WHERE NOT EXISTS ( + SELECT 1 FROM events_seen es + WHERE es.height = e.height AND es.tipset_key = e.tipset_key AND es.tipset_key_cid = e.tipset_key_cid + ) +`) + if err != nil { + return xerrors.Errorf("insert events into events_seen: %w", err) + } + + err = tx.Commit() + if err != nil { + return xerrors.Errorf("commit transaction: %w", err) + } + + ei.vacuumDBAndCheckpointWAL(ctx) + + log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now)) + return nil +} + func (ei *EventIndex) vacuumDBAndCheckpointWAL(ctx context.Context) { // During the large migrations, we have likely increased the WAL size a lot, so lets do some // simple DB administration to free up space (VACUUM followed by truncating the WAL file) @@ -498,6 +572,16 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor version = 5 } + if version == 5 { + log.Infof("Upgrading event index from version 5 to version 6") + err = eventIndex.migrateToVersion6(ctx) + if err != nil { + _ = db.Close() + return nil, xerrors.Errorf("could not migrate event index schema from version 5 to version 6: %w", err) + } + version = 6 + } + if version != schemaVersion { _ = db.Close() return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion) @@ -520,6 +604,13 @@ func (ei *EventIndex) Close() error { return ei.db.Close() } +func (ei *EventIndex) isTipsetProcessed(ctx context.Context, tipsetKey []byte) (bool, error) { + row := ei.db.QueryRowContext(ctx, "SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key = ?", tipsetKey) + var exists bool + err := row.Scan(&exists) + return exists, err +} + func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error { tx, err := ei.db.Begin() if err != nil { @@ -550,6 +641,10 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever if err != nil { return xerrors.Errorf("load executed messages: %w", err) } + tsKeyCid, err := te.msgTs.Key().Cid() + if err != nil { + return xerrors.Errorf("tipset key cid: %w", err) + } eventCount := 0 // iterate over all executed messages in this tipset and insert them into the database if they @@ -567,11 +662,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever addressLookups[ev.Emitter] = addr } - tsKeyCid, err := te.msgTs.Key().Cid() - if err != nil { - return xerrors.Errorf("tipset key cid: %w", err) - } - // check if this event already exists in the database var entryID sql.NullInt64 err = tx.Stmt(ei.stmtEventExists).QueryRow( @@ -651,6 +741,16 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } } + // add an entry to the event_seen table for this tipset + _, err = tx.Stmt(ei.stmtInsertEventsSeen).Exec( + te.msgTs.Height(), + te.msgTs.Key().Bytes(), + tsKeyCid.Bytes(), + ) + if err != nil { + return xerrors.Errorf("exec insert events seen: %w", err) + } + err = tx.Commit() if err != nil { return xerrors.Errorf("commit transaction: %w", err) diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index ce3f7b78a03..1aa69a59514 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -80,6 +80,11 @@ func TestEventIndexPrefillFilter(t *testing.T) { require.NoError(t, err, "collect events") } + tsKey := events14000.msgTs.Key() + seen, err := ei.isTipsetProcessed(context.Background(), tsKey.Bytes()) + require.NoError(t, err) + require.True(t, seen, "tipset key should be seen") + testCases := []struct { name string filter *eventFilter From 700684f7e8dca0fe5f8ea1443d472afa324594bb Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 12 Jun 2024 15:10:32 +0400 Subject: [PATCH 02/20] create correct index --- chain/events/filter/index.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 4e308fe0a83..6eab9db7ade 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -67,7 +67,7 @@ var ddls = []string{ createIndexEventEntryCodecValue, createIndexEventEntryEventId, createIndexEventsSeenHeight, - createIndexEventsSeenTipsetKeyCid, + createIndexEventsSeenTipsetKey, // metadata containing version of schema `CREATE TABLE IF NOT EXISTS _meta ( @@ -112,8 +112,8 @@ const ( tipset_key_cid BLOB NOT NULL )` - createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);` - createIndexEventsSeenTipsetKeyCid = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key_cid ON events_seen (tipset_key_cid);` + createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);` + createIndexEventsSeenTipsetKey = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key ON events_seen (tipset_key);` ) type EventIndex struct { @@ -451,9 +451,9 @@ func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { if err != nil { return xerrors.Errorf("create index events_seen_height: %w", err) } - _, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKeyCid) + _, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKey) if err != nil { - return xerrors.Errorf("create index events_seen_tipset_key_cid: %w", err) + return xerrors.Errorf("create index events_seen_tipset_key: %w", err) } // INSERT an entry in the events_seen table for all epochs we do have have events for in our DB From bce9615015afee07d8fe6381a5b36f100169fa9d Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 12 Jun 2024 15:17:46 +0400 Subject: [PATCH 03/20] migrate to version 6 --- chain/events/filter/index.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 6eab9db7ade..4adc3574255 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -470,6 +470,11 @@ func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { return xerrors.Errorf("insert events into events_seen: %w", err) } + _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (5)") + if err != nil { + return xerrors.Errorf("increment _meta version: %w", err) + } + err = tx.Commit() if err != nil { return xerrors.Errorf("commit transaction: %w", err) @@ -477,7 +482,7 @@ func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { ei.vacuumDBAndCheckpointWAL(ctx) - log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now)) + log.Infof("Successfully migrated event index from version 5 to version 6 in %s", time.Since(now)) return nil } From 6448805c05c62173492f093a729f510e812e819d Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 12 Jun 2024 15:20:52 +0400 Subject: [PATCH 04/20] fix typo --- chain/events/filter/index.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 4adc3574255..fd109162cc3 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -456,7 +456,7 @@ func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { return xerrors.Errorf("create index events_seen_tipset_key: %w", err) } - // INSERT an entry in the events_seen table for all epochs we do have have events for in our DB + // INSERT an entry in the events_seen table for all epochs we do have events for in our DB _, err = tx.ExecContext(ctx, ` INSERT INTO events_seen (height, tipset_key, tipset_key_cid) SELECT e.height, e.tipset_key, e.tipset_key_cid @@ -470,7 +470,7 @@ func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { return xerrors.Errorf("insert events into events_seen: %w", err) } - _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (5)") + _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (6)") if err != nil { return xerrors.Errorf("increment _meta version: %w", err) } From 0ad4466a39a3d3ad0ef02c2a878a24854266711a Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 12 Jun 2024 15:39:36 +0400 Subject: [PATCH 05/20] test both conditions --- chain/events/filter/index_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index 1aa69a59514..e49fce5990c 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -85,6 +85,10 @@ func TestEventIndexPrefillFilter(t *testing.T) { require.NoError(t, err) require.True(t, seen, "tipset key should be seen") + seen, err = ei.isTipsetProcessed(context.Background(), []byte{1}) + require.NoError(t, err) + require.False(t, seen, "tipset key should not be seen") + testCases := []struct { name string filter *eventFilter From 5678feb8938379be4ab2a7218025a2fa8de6e8f6 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 13 Jun 2024 15:26:49 +0400 Subject: [PATCH 06/20] changes as per review --- chain/events/filter/index.go | 20 ++++++++------------ chain/events/filter/index_test.go | 5 ++++- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index fd109162cc3..450f8dd5aa6 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -67,7 +67,7 @@ var ddls = []string{ createIndexEventEntryCodecValue, createIndexEventEntryEventId, createIndexEventsSeenHeight, - createIndexEventsSeenTipsetKey, + createIndexEventsSeenTipsetKeyCid, // metadata containing version of schema `CREATE TABLE IF NOT EXISTS _meta ( @@ -94,7 +94,7 @@ const ( insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` - insertEventsSeen = `INSERT OR IGNORE INTO events_seen(height, tipset_key, tipset_key_cid) VALUES(?, ?, ?)` + insertEventsSeen = `INSERT OR IGNORE INTO events_seen(height, tipset_key_cid) VALUES(?, ?)` createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)` createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);` @@ -108,12 +108,11 @@ const ( createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen ( id INTEGER PRIMARY KEY, height INTEGER NOT NULL, - tipset_key BLOB NOT NULL, tipset_key_cid BLOB NOT NULL )` - createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);` - createIndexEventsSeenTipsetKey = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key ON events_seen (tipset_key);` + createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);` + createIndexEventsSeenTipsetKeyCid = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key_cid ON events_seen (tipset_key_cid);` ) type EventIndex struct { @@ -423,8 +422,6 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context) error { return xerrors.Errorf("commit transaction: %w", err) } - ei.vacuumDBAndCheckpointWAL(ctx) - log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now)) return nil } @@ -451,9 +448,9 @@ func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { if err != nil { return xerrors.Errorf("create index events_seen_height: %w", err) } - _, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKey) + _, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKeyCid) if err != nil { - return xerrors.Errorf("create index events_seen_tipset_key: %w", err) + return xerrors.Errorf("create index events_seen_tipset_key_cid: %w", err) } // INSERT an entry in the events_seen table for all epochs we do have events for in our DB @@ -609,8 +606,8 @@ func (ei *EventIndex) Close() error { return ei.db.Close() } -func (ei *EventIndex) isTipsetProcessed(ctx context.Context, tipsetKey []byte) (bool, error) { - row := ei.db.QueryRowContext(ctx, "SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key = ?", tipsetKey) +func (ei *EventIndex) isTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) { + row := ei.db.QueryRowContext(ctx, "SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid = ?", tipsetKeyCid) var exists bool err := row.Scan(&exists) return exists, err @@ -749,7 +746,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // add an entry to the event_seen table for this tipset _, err = tx.Stmt(ei.stmtInsertEventsSeen).Exec( te.msgTs.Height(), - te.msgTs.Key().Bytes(), tsKeyCid.Bytes(), ) if err != nil { diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index e49fce5990c..f9a1da912dc 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -81,7 +81,10 @@ func TestEventIndexPrefillFilter(t *testing.T) { } tsKey := events14000.msgTs.Key() - seen, err := ei.isTipsetProcessed(context.Background(), tsKey.Bytes()) + tsKeyCid, err := tsKey.Cid() + require.NoError(t, err, "tipset key cid") + + seen, err := ei.isTipsetProcessed(context.Background(), tsKeyCid.Bytes()) require.NoError(t, err) require.True(t, seen, "tipset key should be seen") From 736eeb43879a4de0481997c6b9914b8cff1309d1 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 13 Jun 2024 18:43:55 +0400 Subject: [PATCH 07/20] record reverted tipsets --- chain/events/filter/index.go | 52 ++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 450f8dd5aa6..cd08d997fa8 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -94,7 +94,9 @@ const ( insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` - insertEventsSeen = `INSERT OR IGNORE INTO events_seen(height, tipset_key_cid) VALUES(?, ?)` + revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?` + restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?` + insertEventsSeen = `INSERT OR IGNORE INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, ?)` createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)` createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);` @@ -108,7 +110,8 @@ const ( createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen ( id INTEGER PRIMARY KEY, height INTEGER NOT NULL, - tipset_key_cid BLOB NOT NULL + tipset_key_cid BLOB NOT NULL, + reverted INTEGER NOT NULL )` createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);` @@ -124,6 +127,8 @@ type EventIndex struct { stmtRevertEventsInTipset *sql.Stmt stmtRestoreEvent *sql.Stmt stmtInsertEventsSeen *sql.Stmt + stmtRevertEventSeen *sql.Stmt + stmtRestoreEventSeen *sql.Stmt } func (ei *EventIndex) initStatements() (err error) { @@ -157,6 +162,16 @@ func (ei *EventIndex) initStatements() (err error) { return xerrors.Errorf("prepare stmtInsertEventsSeen: %w", err) } + ei.stmtRevertEventSeen, err = ei.db.Prepare(revertEventSeen) + if err != nil { + return xerrors.Errorf("prepare stmtRevertEventSeen: %w", err) + } + + ei.stmtRestoreEventSeen, err = ei.db.Prepare(restoreEventSeen) + if err != nil { + return xerrors.Errorf("prepare stmtRestoreEventSeen: %w", err) + } + return nil } @@ -455,12 +470,12 @@ func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { // INSERT an entry in the events_seen table for all epochs we do have events for in our DB _, err = tx.ExecContext(ctx, ` - INSERT INTO events_seen (height, tipset_key, tipset_key_cid) - SELECT e.height, e.tipset_key, e.tipset_key_cid + INSERT INTO events_seen (height, tipset_key_cid, reverted) + SELECT e.height, e.tipset_key_cid, e.reverted FROM event e WHERE NOT EXISTS ( SELECT 1 FROM events_seen es - WHERE es.height = e.height AND es.tipset_key = e.tipset_key AND es.tipset_key_cid = e.tipset_key_cid + WHERE es.height = e.height AND es.tipset_key_cid = e.tipset_key_cid ) `) if err != nil { @@ -623,11 +638,21 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // lets handle the revert case first, since its simpler and we can simply mark all events in this tipset as reverted and return if revert { + tsKeyCid, err := te.msgTs.Key().Cid() + if err != nil { + return xerrors.Errorf("tipset key cid: %w", err) + } + _, err = tx.Stmt(ei.stmtRevertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes()) if err != nil { return xerrors.Errorf("revert event: %w", err) } + _, err = tx.Stmt(ei.stmtRevertEventSeen).Exec(te.msgTs.Height(), tsKeyCid.Bytes()) + if err != nil { + return xerrors.Errorf("revert event seen: %w", err) + } + err = tx.Commit() if err != nil { return xerrors.Errorf("commit transaction: %w", err) @@ -738,6 +763,22 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever if rowsAffected != 1 { log.Warnf("restored %d events but expected only one to exist", rowsAffected) } + + res, err = tx.Stmt(ei.stmtRestoreEventSeen).Exec( + te.msgTs.Height(), + tsKeyCid.Bytes(), + ) + if err != nil { + return xerrors.Errorf("exec restore event seen: %w", err) + } + + rowsAffected, err = res.RowsAffected() + if err != nil { + return xerrors.Errorf("error getting rows affected: %s", err) + } + if rowsAffected != 1 { + log.Warnf("restored %d events in events_seen but expected only one to exist", rowsAffected) + } } eventCount++ } @@ -747,6 +788,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever _, err = tx.Stmt(ei.stmtInsertEventsSeen).Exec( te.msgTs.Height(), tsKeyCid.Bytes(), + false, ) if err != nil { return xerrors.Errorf("exec insert events seen: %w", err) From 65ae9208e6341c8c238c5979ea52ee95ab9460d3 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 13 Jun 2024 18:52:33 +0400 Subject: [PATCH 08/20] see if tipsets has events and has not been reverted --- chain/events/filter/index.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index cd08d997fa8..c81c6f59ee7 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -475,7 +475,7 @@ func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { FROM event e WHERE NOT EXISTS ( SELECT 1 FROM events_seen es - WHERE es.height = e.height AND es.tipset_key_cid = e.tipset_key_cid + WHERE es.height = e.height AND es.tipset_key_cid = e.tipset_key_cid ) `) if err != nil { @@ -628,6 +628,13 @@ func (ei *EventIndex) isTipsetProcessed(ctx context.Context, tipsetKeyCid []byte return exists, err } +func (ei *EventIndex) isTipsetEventsApplied(ctx context.Context, tipsetKeyCid []byte) (bool, error) { + row := ei.db.QueryRowContext(ctx, "SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid = ? AND reverted = ?", tipsetKeyCid, false) + var exists bool + err := row.Scan(&exists) + return exists, err +} + func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error { tx, err := ei.db.Begin() if err != nil { From e4cf426c1b48c1ba9eeb2d04d1b33906756b6d87 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Fri, 14 Jun 2024 12:08:09 +0400 Subject: [PATCH 09/20] sub/unsub tipset updates from the index --- chain/events/filter/index.go | 61 ++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index c81c6f59ee7..e5d4373c986 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -7,6 +7,7 @@ import ( "fmt" "sort" "strings" + "sync" "time" "github.com/ipfs/go-cid" @@ -129,6 +130,16 @@ type EventIndex struct { stmtInsertEventsSeen *sql.Stmt stmtRevertEventSeen *sql.Stmt stmtRestoreEventSeen *sql.Stmt + + mu sync.Mutex + subIdCounter uint64 + subscribedTipsets map[uint64]chan tipsetUpdate +} + +type tipsetUpdate struct { + height uint64 + tipsetCid cid.Cid + revert bool } func (ei *EventIndex) initStatements() (err error) { @@ -611,6 +622,8 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err) } + eventIndex.subscribedTipsets = make(map[uint64]chan tipsetUpdate) + return &eventIndex, nil } @@ -621,6 +634,24 @@ func (ei *EventIndex) Close() error { return ei.db.Close() } +func (ei *EventIndex) subscribeTipsetUpdates() (uint64, chan<- tipsetUpdate) { + ch := make(chan tipsetUpdate) + + ei.mu.Lock() + subId := ei.subIdCounter + ei.subIdCounter++ + ei.subscribedTipsets[subId] = ch + ei.mu.Unlock() + + return subId, ch +} + +func (ei *EventIndex) unsubscribeTipsetUpdates(subId uint64) { + ei.mu.Lock() + delete(ei.subscribedTipsets, subId) + ei.mu.Unlock() +} + func (ei *EventIndex) isTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) { row := ei.db.QueryRowContext(ctx, "SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid = ?", tipsetKeyCid) var exists bool @@ -665,6 +696,21 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return xerrors.Errorf("commit transaction: %w", err) } + ei.mu.Lock() + for _, ch := range ei.subscribedTipsets { + select { + case ch <- tipsetUpdate{ + height: uint64(te.msgTs.Height()), + tipsetCid: tsKeyCid, + revert: true, + }: + case <-ctx.Done(): + return ctx.Err() + default: + } + } + ei.mu.Unlock() + return nil } @@ -806,6 +852,21 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return xerrors.Errorf("commit transaction: %w", err) } + ei.mu.Lock() + for _, ch := range ei.subscribedTipsets { + select { + case ch <- tipsetUpdate{ + height: uint64(te.msgTs.Height()), + tipsetCid: tsKeyCid, + revert: false, + }: + case <-ctx.Done(): + return ctx.Err() + default: + } + } + ei.mu.Unlock() + return nil } From 242164496862ded63db6947028ebfc4c5872afa5 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Fri, 14 Jun 2024 16:35:15 +0400 Subject: [PATCH 10/20] eth_get_logs should wait for events --- chain/events/filter/index.go | 101 ++++++++++++++++++++---------- chain/events/filter/index_test.go | 59 ++++++++++++++++- node/impl/full/eth.go | 95 ++++++++++++++++++++++++++-- 3 files changed, 216 insertions(+), 39 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index e5d4373c986..53bdd6f187c 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -133,13 +133,19 @@ type EventIndex struct { mu sync.Mutex subIdCounter uint64 - subscribedTipsets map[uint64]chan tipsetUpdate + subscribedTipsets map[uint64]*tipsetSub } -type tipsetUpdate struct { - height uint64 - tipsetCid cid.Cid - revert bool +type tipsetSub struct { + ctx context.Context + ch chan *TipsetUpdate + cancel context.CancelFunc +} + +type TipsetUpdate struct { + Height uint64 + TipsetCid cid.Cid + Reverted bool } func (ei *EventIndex) initStatements() (err error) { @@ -622,7 +628,7 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err) } - eventIndex.subscribedTipsets = make(map[uint64]chan tipsetUpdate) + eventIndex.subscribedTipsets = make(map[uint64]*tipsetSub) return &eventIndex, nil } @@ -634,33 +640,56 @@ func (ei *EventIndex) Close() error { return ei.db.Close() } -func (ei *EventIndex) subscribeTipsetUpdates() (uint64, chan<- tipsetUpdate) { - ch := make(chan tipsetUpdate) +func (ei *EventIndex) SubscribeTipsetUpdates() (uint64, chan *TipsetUpdate) { + subCtx, subCancel := context.WithCancel(context.Background()) + ch := make(chan *TipsetUpdate) + + tSub := &tipsetSub{ + ctx: subCtx, + cancel: subCancel, + ch: ch, + } ei.mu.Lock() subId := ei.subIdCounter ei.subIdCounter++ - ei.subscribedTipsets[subId] = ch + ei.subscribedTipsets[subId] = tSub ei.mu.Unlock() - return subId, ch + return subId, tSub.ch } -func (ei *EventIndex) unsubscribeTipsetUpdates(subId uint64) { +func (ei *EventIndex) UnsubscribeTipsetUpdates(subId uint64) { ei.mu.Lock() + tSub, ok := ei.subscribedTipsets[subId] + if !ok { + ei.mu.Unlock() + return + } delete(ei.subscribedTipsets, subId) ei.mu.Unlock() + + // cancel the subscription + tSub.cancel() } -func (ei *EventIndex) isTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) { - row := ei.db.QueryRowContext(ctx, "SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid = ?", tipsetKeyCid) - var exists bool - err := row.Scan(&exists) - return exists, err +func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) { + row := ei.db.QueryRowContext(ctx, "SELECT MAX(height) FROM events_seen") + var maxHeight uint64 + err := row.Scan(&maxHeight) + return maxHeight, err } -func (ei *EventIndex) isTipsetEventsApplied(ctx context.Context, tipsetKeyCid []byte) (bool, error) { - row := ei.db.QueryRowContext(ctx, "SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid = ? AND reverted = ?", tipsetKeyCid, false) +func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (bool, error) { + mh, err := ei.GetMaxHeightInIndex(ctx) + if err != nil { + return false, err + } + return height <= mh, nil +} + +func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) { + row := ei.db.QueryRowContext(ctx, "SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid = ?", tipsetKeyCid) var exists bool err := row.Scan(&exists) return exists, err @@ -697,19 +726,23 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } ei.mu.Lock() - for _, ch := range ei.subscribedTipsets { + tSubs := ei.subscribedTipsets + ei.mu.Unlock() + + for _, tSub := range tSubs { + tSub := tSub select { - case ch <- tipsetUpdate{ - height: uint64(te.msgTs.Height()), - tipsetCid: tsKeyCid, - revert: true, + case tSub.ch <- &TipsetUpdate{ + Height: uint64(te.msgTs.Height()), + TipsetCid: tsKeyCid, + Reverted: true, }: + case <-tSub.ctx.Done(): + // subscription was cancelled, ignore case <-ctx.Done(): return ctx.Err() - default: } } - ei.mu.Unlock() return nil } @@ -853,19 +886,23 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } ei.mu.Lock() - for _, ch := range ei.subscribedTipsets { + tSubs := ei.subscribedTipsets + ei.mu.Unlock() + + for _, tSub := range tSubs { + tSub := tSub select { - case ch <- tipsetUpdate{ - height: uint64(te.msgTs.Height()), - tipsetCid: tsKeyCid, - revert: false, + case tSub.ch <- &TipsetUpdate{ + Height: uint64(te.msgTs.Height()), + TipsetCid: tsKeyCid, + Reverted: false, }: + case <-tSub.ctx.Done(): + // subscription was cancelled, ignore case <-ctx.Done(): return ctx.Err() - default: } } - ei.mu.Unlock() return nil } diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index f9a1da912dc..29370b82274 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -76,22 +76,47 @@ func TestEventIndexPrefillFilter(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") + + _, subCh := ei.SubscribeTipsetUpdates() + out := make(chan *TipsetUpdate, 1) + go func() { + tu := <-subCh + out <- tu + }() + if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil { require.NoError(t, err, "collect events") } + mh, err := ei.GetMaxHeightInIndex(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(14000), mh) + + b, err := ei.IsHeightProcessed(context.Background(), 14000) + require.NoError(t, err) + require.True(t, b) + + b, err = ei.IsHeightProcessed(context.Background(), 14001) + require.NoError(t, err) + require.False(t, b) + tsKey := events14000.msgTs.Key() tsKeyCid, err := tsKey.Cid() require.NoError(t, err, "tipset key cid") - seen, err := ei.isTipsetProcessed(context.Background(), tsKeyCid.Bytes()) + seen, err := ei.IsTipsetProcessed(context.Background(), tsKeyCid.Bytes()) require.NoError(t, err) require.True(t, seen, "tipset key should be seen") - seen, err = ei.isTipsetProcessed(context.Background(), []byte{1}) + seen, err = ei.IsTipsetProcessed(context.Background(), []byte{1}) require.NoError(t, err) require.False(t, seen, "tipset key should not be seen") + tu := <-out + require.Equal(t, uint64(14000), tu.Height) + require.EqualValues(t, tsKeyCid, tu.TipsetCid) + require.False(t, tu.Reverted) + testCases := []struct { name string filter *eventFilter @@ -409,6 +434,21 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") + + tCh := make(chan *TipsetUpdate, 3) + _, subCh := ei.SubscribeTipsetUpdates() + go func() { + cnt := 0 + for tu := range subCh { + tCh <- tu + cnt++ + if cnt == 3 { + close(tCh) + return + } + } + }() + if err := ei.CollectEvents(context.Background(), revertedEvents14000, false, addrMap.ResolveAddress); err != nil { require.NoError(t, err, "collect reverted events") } @@ -419,6 +459,21 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { require.NoError(t, err, "collect events") } + tu1 := <-tCh + require.EqualValues(t, 14000, tu1.Height) + require.False(t, tu1.Reverted) + require.EqualValues(t, reveredCID14000, tu1.TipsetCid) + + tu2 := <-tCh + require.EqualValues(t, 14000, tu2.Height) + require.True(t, tu2.Reverted) + require.EqualValues(t, reveredCID14000, tu2.TipsetCid) + + tu3 := <-tCh + require.EqualValues(t, 14000, tu3.Height) + require.False(t, tu3.Reverted) + require.EqualValues(t, cid14000, tu3.TipsetCid) + inclusiveTestCases := []struct { name string filter *eventFilter diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 82f272c6cff..a33548acd5e 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1258,10 +1258,47 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E return nil, api.ErrNotSupported } + if e.EventFilterManager.EventIndex == nil { + return nil, xerrors.Errorf("cannot use eth_get_logs if historical event index is disabled") + } + + pf, err := e.parseEthFilterSpec(ctx, filterSpec) + if err != nil { + return nil, xerrors.Errorf("failed to parse eth filter spec: %w", err) + } + + if pf.tipsetCid == cid.Undef { + maxHeight := pf.maxHeight + if maxHeight == -1 { + maxHeight = e.Chain.GetHeaviestTipSet().Height() + } + err := e.waitForHeightProcessed(ctx, maxHeight) + if err != nil { + return nil, err + } + } else { + ts, err := e.Chain.GetTipSetByCid(ctx, pf.tipsetCid) + if err != nil { + return nil, xerrors.Errorf("failed to get tipset by cid: %w", err) + } + err = e.waitForHeightProcessed(ctx, ts.Height()) + if err != nil { + return nil, err + } + + b, err := e.EventFilterManager.EventIndex.IsTipsetProcessed(ctx, pf.tipsetCid.Bytes()) + if err != nil { + return nil, xerrors.Errorf("failed to check if tipset events have been indexed: %w", err) + } + if !b { + return nil, xerrors.Errorf("event index failed to index tipset %s", pf.tipsetCid.String()) + } + } + // Create a temporary filter - f, err := e.installEthFilterSpec(ctx, filterSpec) + f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys, true) if err != nil { - return nil, err + return nil, xerrors.Errorf("failed to install event filter: %w", err) } ces := f.TakeCollectedEvents(ctx) @@ -1270,6 +1307,35 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E return ethFilterResultFromEvents(ctx, ces, e.SubManager.StateAPI) } +func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi.ChainEpoch) error { + ei := e.EventFilterManager.EventIndex + b, err := ei.IsHeightProcessed(ctx, uint64(height)) + if err != nil { + return xerrors.Errorf("failed to check if event index has events for given height: %w", err) + } + if b { + return nil + } + + if height > e.Chain.GetHeaviestTipSet().Height() { + return xerrors.New("height is in the future") + } + + subId, subCh := ei.SubscribeTipsetUpdates() + defer ei.UnsubscribeTipsetUpdates(subId) + + for { + select { + case tu := <-subCh: + if tu.Height >= uint64(height) { + return nil + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + func (e *EthEventHandler) EthGetFilterChanges(ctx context.Context, id ethtypes.EthFilterID) (*ethtypes.EthFilterResult, error) { if e.FilterStore == nil { return nil, api.ErrNotSupported @@ -1368,7 +1434,15 @@ func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRan return minHeight, maxHeight, nil } -func (e *EthEventHandler) installEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (filter.EventFilter, error) { +type parsedFilter struct { + minHeight abi.ChainEpoch + maxHeight abi.ChainEpoch + tipsetCid cid.Cid + addresses []address.Address + keys map[string][]types.ActorEventBlock +} + +func (e *EthEventHandler) parseEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (*parsedFilter, error) { var ( minHeight abi.ChainEpoch maxHeight abi.ChainEpoch @@ -1405,7 +1479,13 @@ func (e *EthEventHandler) installEthFilterSpec(ctx context.Context, filterSpec * return nil, err } - return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keysToKeysWithCodec(keys), true) + return &parsedFilter{ + minHeight: minHeight, + maxHeight: maxHeight, + tipsetCid: tipsetCid, + addresses: addresses, + keys: keysToKeysWithCodec(keys), + }, nil } func keysToKeysWithCodec(keys map[string][][]byte) map[string][]types.ActorEventBlock { @@ -1426,11 +1506,16 @@ func (e *EthEventHandler) EthNewFilter(ctx context.Context, filterSpec *ethtypes return ethtypes.EthFilterID{}, api.ErrNotSupported } - f, err := e.installEthFilterSpec(ctx, filterSpec) + pf, err := e.parseEthFilterSpec(ctx, filterSpec) if err != nil { return ethtypes.EthFilterID{}, err } + f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys, true) + if err != nil { + return ethtypes.EthFilterID{}, xerrors.Errorf("failed to install event filter: %w", err) + } + if err := e.FilterStore.Add(ctx, f); err != nil { // Could not record in store, attempt to delete filter to clean up err2 := e.TipSetFilterManager.Remove(ctx, f.ID()) From 67fbe6a96262449f355fb7dd31e0f7969790c164 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Fri, 14 Jun 2024 16:53:38 +0400 Subject: [PATCH 11/20] fix naming --- chain/events/filter/index.go | 36 +++++++++++++++---------------- chain/events/filter/index_test.go | 8 +++---- node/impl/full/eth.go | 4 ++-- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 53bdd6f187c..4329297d175 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -131,18 +131,18 @@ type EventIndex struct { stmtRevertEventSeen *sql.Stmt stmtRestoreEventSeen *sql.Stmt - mu sync.Mutex - subIdCounter uint64 - subscribedTipsets map[uint64]*tipsetSub + mu sync.Mutex + subIdCounter uint64 + updateSubs map[uint64]*updateSub } -type tipsetSub struct { +type updateSub struct { ctx context.Context - ch chan *TipsetUpdate + ch chan *EventIndexUpdate cancel context.CancelFunc } -type TipsetUpdate struct { +type EventIndexUpdate struct { Height uint64 TipsetCid cid.Cid Reverted bool @@ -628,7 +628,7 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err) } - eventIndex.subscribedTipsets = make(map[uint64]*tipsetSub) + eventIndex.updateSubs = make(map[uint64]*updateSub) return &eventIndex, nil } @@ -640,11 +640,11 @@ func (ei *EventIndex) Close() error { return ei.db.Close() } -func (ei *EventIndex) SubscribeTipsetUpdates() (uint64, chan *TipsetUpdate) { +func (ei *EventIndex) SubscribeUpdates() (uint64, chan *EventIndexUpdate) { subCtx, subCancel := context.WithCancel(context.Background()) - ch := make(chan *TipsetUpdate) + ch := make(chan *EventIndexUpdate) - tSub := &tipsetSub{ + tSub := &updateSub{ ctx: subCtx, cancel: subCancel, ch: ch, @@ -653,20 +653,20 @@ func (ei *EventIndex) SubscribeTipsetUpdates() (uint64, chan *TipsetUpdate) { ei.mu.Lock() subId := ei.subIdCounter ei.subIdCounter++ - ei.subscribedTipsets[subId] = tSub + ei.updateSubs[subId] = tSub ei.mu.Unlock() return subId, tSub.ch } -func (ei *EventIndex) UnsubscribeTipsetUpdates(subId uint64) { +func (ei *EventIndex) UnsubscribeUpdates(subId uint64) { ei.mu.Lock() - tSub, ok := ei.subscribedTipsets[subId] + tSub, ok := ei.updateSubs[subId] if !ok { ei.mu.Unlock() return } - delete(ei.subscribedTipsets, subId) + delete(ei.updateSubs, subId) ei.mu.Unlock() // cancel the subscription @@ -726,13 +726,13 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } ei.mu.Lock() - tSubs := ei.subscribedTipsets + tSubs := ei.updateSubs ei.mu.Unlock() for _, tSub := range tSubs { tSub := tSub select { - case tSub.ch <- &TipsetUpdate{ + case tSub.ch <- &EventIndexUpdate{ Height: uint64(te.msgTs.Height()), TipsetCid: tsKeyCid, Reverted: true, @@ -886,13 +886,13 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } ei.mu.Lock() - tSubs := ei.subscribedTipsets + tSubs := ei.updateSubs ei.mu.Unlock() for _, tSub := range tSubs { tSub := tSub select { - case tSub.ch <- &TipsetUpdate{ + case tSub.ch <- &EventIndexUpdate{ Height: uint64(te.msgTs.Height()), TipsetCid: tsKeyCid, Reverted: false, diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index 29370b82274..10b1648b13f 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -77,8 +77,8 @@ func TestEventIndexPrefillFilter(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") - _, subCh := ei.SubscribeTipsetUpdates() - out := make(chan *TipsetUpdate, 1) + _, subCh := ei.SubscribeUpdates() + out := make(chan *EventIndexUpdate, 1) go func() { tu := <-subCh out <- tu @@ -435,8 +435,8 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") - tCh := make(chan *TipsetUpdate, 3) - _, subCh := ei.SubscribeTipsetUpdates() + tCh := make(chan *EventIndexUpdate, 3) + _, subCh := ei.SubscribeUpdates() go func() { cnt := 0 for tu := range subCh { diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index a33548acd5e..443f56d440c 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1321,8 +1321,8 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi return xerrors.New("height is in the future") } - subId, subCh := ei.SubscribeTipsetUpdates() - defer ei.UnsubscribeTipsetUpdates(subId) + subId, subCh := ei.SubscribeUpdates() + defer ei.UnsubscribeUpdates(subId) for { select { From c61c2706c6c220274af50f16e2b7c2b334676f86 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 17 Jun 2024 11:47:51 +0400 Subject: [PATCH 12/20] changes as per review --- chain/events/filter/index.go | 141 +++++++++++++++++------------- chain/events/filter/index_test.go | 17 ++-- node/impl/full/eth.go | 22 +++-- 3 files changed, 107 insertions(+), 73 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 4329297d175..432ca0dbd98 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -98,6 +98,8 @@ const ( revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?` restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?` insertEventsSeen = `INSERT OR IGNORE INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, ?)` + isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?` + getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen` createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)` createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);` @@ -131,6 +133,9 @@ type EventIndex struct { stmtRevertEventSeen *sql.Stmt stmtRestoreEventSeen *sql.Stmt + stmtIsTipsetProcessed *sql.Stmt + stmtGetMaxHeightInIndex *sql.Stmt + mu sync.Mutex subIdCounter uint64 updateSubs map[uint64]*updateSub @@ -143,9 +148,9 @@ type updateSub struct { } type EventIndexUpdate struct { - Height uint64 - TipsetCid cid.Cid - Reverted bool + Height abi.ChainEpoch + TipsetKeyCid cid.Cid + Reverted bool } func (ei *EventIndex) initStatements() (err error) { @@ -189,6 +194,16 @@ func (ei *EventIndex) initStatements() (err error) { return xerrors.Errorf("prepare stmtRestoreEventSeen: %w", err) } + ei.stmtIsTipsetProcessed, err = ei.db.Prepare(isTipsetProcessed) + if err != nil { + return xerrors.Errorf("prepare isTipsetProcessed: %w", err) + } + + ei.stmtGetMaxHeightInIndex, err = ei.db.Prepare(getMaxHeightInIndex) + if err != nil { + return xerrors.Errorf("prepare getMaxHeightInIndex: %w", err) + } + return nil } @@ -640,7 +655,7 @@ func (ei *EventIndex) Close() error { return ei.db.Close() } -func (ei *EventIndex) SubscribeUpdates() (uint64, chan *EventIndexUpdate) { +func (ei *EventIndex) SubscribeUpdates() (chan *EventIndexUpdate, func()) { subCtx, subCancel := context.WithCancel(context.Background()) ch := make(chan *EventIndexUpdate) @@ -656,25 +671,25 @@ func (ei *EventIndex) SubscribeUpdates() (uint64, chan *EventIndexUpdate) { ei.updateSubs[subId] = tSub ei.mu.Unlock() - return subId, tSub.ch -} - -func (ei *EventIndex) UnsubscribeUpdates(subId uint64) { - ei.mu.Lock() - tSub, ok := ei.updateSubs[subId] - if !ok { + unSubscribeF := func() { + ei.mu.Lock() + tSub, ok := ei.updateSubs[subId] + if !ok { + ei.mu.Unlock() + return + } + delete(ei.updateSubs, subId) ei.mu.Unlock() - return + + // cancel the subscription + tSub.cancel() } - delete(ei.updateSubs, subId) - ei.mu.Unlock() - // cancel the subscription - tSub.cancel() + return tSub.ch, unSubscribeF } func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) { - row := ei.db.QueryRowContext(ctx, "SELECT MAX(height) FROM events_seen") + row := ei.stmtGetMaxHeightInIndex.QueryRowContext(ctx) var maxHeight uint64 err := row.Scan(&maxHeight) return maxHeight, err @@ -689,7 +704,7 @@ func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (boo } func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) { - row := ei.db.QueryRowContext(ctx, "SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid = ?", tipsetKeyCid) + row := ei.stmtIsTipsetProcessed.QueryRowContext(ctx, tipsetKeyCid) var exists bool err := row.Scan(&exists) return exists, err @@ -703,13 +718,13 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // rollback the transaction (a no-op if the transaction was already committed) defer func() { _ = tx.Rollback() }() + tsKeyCid, err := te.msgTs.Key().Cid() + if err != nil { + return xerrors.Errorf("tipset key cid: %w", err) + } + // lets handle the revert case first, since its simpler and we can simply mark all events in this tipset as reverted and return if revert { - tsKeyCid, err := te.msgTs.Key().Cid() - if err != nil { - return xerrors.Errorf("tipset key cid: %w", err) - } - _, err = tx.Stmt(ei.stmtRevertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes()) if err != nil { return xerrors.Errorf("revert event: %w", err) @@ -726,16 +741,19 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } ei.mu.Lock() - tSubs := ei.updateSubs + tSubs := make([]*updateSub, 0, len(ei.updateSubs)) + for _, tSub := range ei.updateSubs { + tSubs = append(tSubs, tSub) + } ei.mu.Unlock() for _, tSub := range tSubs { tSub := tSub select { case tSub.ch <- &EventIndexUpdate{ - Height: uint64(te.msgTs.Height()), - TipsetCid: tsKeyCid, - Reverted: true, + Height: te.msgTs.Height(), + TipsetKeyCid: tsKeyCid, + Reverted: true, }: case <-tSub.ctx.Done(): // subscription was cancelled, ignore @@ -754,12 +772,9 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever if err != nil { return xerrors.Errorf("load executed messages: %w", err) } - tsKeyCid, err := te.msgTs.Key().Cid() - if err != nil { - return xerrors.Errorf("tipset key cid: %w", err) - } eventCount := 0 + isTipSetRevertRestored := false // iterate over all executed messages in this tipset and insert them into the database if they // don't exist, otherwise mark them as not reverted for msgIdx, em := range ems { @@ -848,36 +863,41 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // this is a sanity check as we should only ever be updating one event if rowsAffected != 1 { log.Warnf("restored %d events but expected only one to exist", rowsAffected) - } - - res, err = tx.Stmt(ei.stmtRestoreEventSeen).Exec( - te.msgTs.Height(), - tsKeyCid.Bytes(), - ) - if err != nil { - return xerrors.Errorf("exec restore event seen: %w", err) - } - - rowsAffected, err = res.RowsAffected() - if err != nil { - return xerrors.Errorf("error getting rows affected: %s", err) - } - if rowsAffected != 1 { - log.Warnf("restored %d events in events_seen but expected only one to exist", rowsAffected) + } else { + isTipSetRevertRestored = true } } eventCount++ } } - // add an entry to the event_seen table for this tipset - _, err = tx.Stmt(ei.stmtInsertEventsSeen).Exec( - te.msgTs.Height(), - tsKeyCid.Bytes(), - false, - ) - if err != nil { - return xerrors.Errorf("exec insert events seen: %w", err) + if isTipSetRevertRestored { + // restore a revert for the tipset if applicable + res, err := tx.Stmt(ei.stmtRestoreEventSeen).Exec( + te.msgTs.Height(), + tsKeyCid.Bytes(), + ) + if err != nil { + return xerrors.Errorf("exec restore event seen: %w", err) + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return xerrors.Errorf("error getting rows affected: %s", err) + } + if rowsAffected != 1 { + log.Warnf("restored %d events in events_seen but expected only one to exist", rowsAffected) + } + } else { + // add an entry to the event_seen table for this tipset + _, err = tx.Stmt(ei.stmtInsertEventsSeen).Exec( + te.msgTs.Height(), + tsKeyCid.Bytes(), + false, + ) + if err != nil { + return xerrors.Errorf("exec insert events seen: %w", err) + } } err = tx.Commit() @@ -886,16 +906,19 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } ei.mu.Lock() - tSubs := ei.updateSubs + tSubs := make([]*updateSub, 0, len(ei.updateSubs)) + for _, tSub := range ei.updateSubs { + tSubs = append(tSubs, tSub) + } ei.mu.Unlock() for _, tSub := range tSubs { tSub := tSub select { case tSub.ch <- &EventIndexUpdate{ - Height: uint64(te.msgTs.Height()), - TipsetCid: tsKeyCid, - Reverted: false, + Height: te.msgTs.Height(), + TipsetKeyCid: tsKeyCid, + Reverted: false, }: case <-tSub.ctx.Done(): // subscription was cancelled, ignore diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index 10b1648b13f..fad8aa7e2dc 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -77,7 +77,9 @@ func TestEventIndexPrefillFilter(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") - _, subCh := ei.SubscribeUpdates() + subCh, unSubscribe := ei.SubscribeUpdates() + defer unSubscribe() + out := make(chan *EventIndexUpdate, 1) go func() { tu := <-subCh @@ -113,8 +115,8 @@ func TestEventIndexPrefillFilter(t *testing.T) { require.False(t, seen, "tipset key should not be seen") tu := <-out - require.Equal(t, uint64(14000), tu.Height) - require.EqualValues(t, tsKeyCid, tu.TipsetCid) + require.Equal(t, abi.ChainEpoch(14000), tu.Height) + require.EqualValues(t, tsKeyCid, tu.TipsetKeyCid) require.False(t, tu.Reverted) testCases := []struct { @@ -436,7 +438,8 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { require.NoError(t, err, "create event index") tCh := make(chan *EventIndexUpdate, 3) - _, subCh := ei.SubscribeUpdates() + subCh, unSubscribe := ei.SubscribeUpdates() + defer unSubscribe() go func() { cnt := 0 for tu := range subCh { @@ -462,17 +465,17 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { tu1 := <-tCh require.EqualValues(t, 14000, tu1.Height) require.False(t, tu1.Reverted) - require.EqualValues(t, reveredCID14000, tu1.TipsetCid) + require.EqualValues(t, reveredCID14000, tu1.TipsetKeyCid) tu2 := <-tCh require.EqualValues(t, 14000, tu2.Height) require.True(t, tu2.Reverted) - require.EqualValues(t, reveredCID14000, tu2.TipsetCid) + require.EqualValues(t, reveredCID14000, tu2.TipsetKeyCid) tu3 := <-tCh require.EqualValues(t, 14000, tu3.Height) require.False(t, tu3.Reverted) - require.EqualValues(t, cid14000, tu3.TipsetCid) + require.EqualValues(t, cid14000, tu3.TipsetKeyCid) inclusiveTestCases := []struct { name string diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 443f56d440c..b65494961a6 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -44,6 +44,11 @@ var ErrUnsupported = errors.New("unsupported method") const maxEthFeeHistoryRewardPercentiles = 100 +var ( + // wait for 3 epochs + eventReadTimeout = 90 * time.Second +) + type EthModuleAPI interface { EthBlockNumber(ctx context.Context) (ethtypes.EthUint64, error) EthAccounts(ctx context.Context) ([]ethtypes.EthAddress, error) @@ -1308,6 +1313,13 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E } func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi.ChainEpoch) error { + ctx, cancel := context.WithTimeout(ctx, eventReadTimeout) + defer cancel() + + if height > e.Chain.GetHeaviestTipSet().Height() { + return xerrors.New("height is in the future") + } + ei := e.EventFilterManager.EventIndex b, err := ei.IsHeightProcessed(ctx, uint64(height)) if err != nil { @@ -1317,17 +1329,13 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi return nil } - if height > e.Chain.GetHeaviestTipSet().Height() { - return xerrors.New("height is in the future") - } - - subId, subCh := ei.SubscribeUpdates() - defer ei.UnsubscribeUpdates(subId) + subCh, unSubscribeF := ei.SubscribeUpdates() + defer unSubscribeF() for { select { case tu := <-subCh: - if tu.Height >= uint64(height) { + if tu.Height >= height { return nil } case <-ctx.Done(): From 03e7083e3e63d6e376e3b9ccfd4fcfc9847396f0 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 17 Jun 2024 13:02:19 +0400 Subject: [PATCH 13/20] solve issue with missing events --- chain/events/filter/index.go | 24 ++++++------------------ chain/events/filter/index_test.go | 26 ++++++-------------------- node/impl/full/eth.go | 27 +++++++++++++++++++++------ 3 files changed, 33 insertions(+), 44 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 432ca0dbd98..3631a7c6860 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -143,15 +143,11 @@ type EventIndex struct { type updateSub struct { ctx context.Context - ch chan *EventIndexUpdate + ch chan EventIndexUpdated cancel context.CancelFunc } -type EventIndexUpdate struct { - Height abi.ChainEpoch - TipsetKeyCid cid.Cid - Reverted bool -} +type EventIndexUpdated struct{} func (ei *EventIndex) initStatements() (err error) { ei.stmtEventExists, err = ei.db.Prepare(eventExists) @@ -655,9 +651,9 @@ func (ei *EventIndex) Close() error { return ei.db.Close() } -func (ei *EventIndex) SubscribeUpdates() (chan *EventIndexUpdate, func()) { +func (ei *EventIndex) SubscribeUpdates() (chan EventIndexUpdated, func()) { subCtx, subCancel := context.WithCancel(context.Background()) - ch := make(chan *EventIndexUpdate) + ch := make(chan EventIndexUpdated) tSub := &updateSub{ ctx: subCtx, @@ -750,11 +746,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever for _, tSub := range tSubs { tSub := tSub select { - case tSub.ch <- &EventIndexUpdate{ - Height: te.msgTs.Height(), - TipsetKeyCid: tsKeyCid, - Reverted: true, - }: + case tSub.ch <- EventIndexUpdated{}: case <-tSub.ctx.Done(): // subscription was cancelled, ignore case <-ctx.Done(): @@ -915,11 +907,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever for _, tSub := range tSubs { tSub := tSub select { - case tSub.ch <- &EventIndexUpdate{ - Height: te.msgTs.Height(), - TipsetKeyCid: tsKeyCid, - Reverted: false, - }: + case tSub.ch <- EventIndexUpdated{}: case <-tSub.ctx.Done(): // subscription was cancelled, ignore case <-ctx.Done(): diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index fad8aa7e2dc..b9e5c49a05c 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -80,7 +80,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { subCh, unSubscribe := ei.SubscribeUpdates() defer unSubscribe() - out := make(chan *EventIndexUpdate, 1) + out := make(chan EventIndexUpdated, 1) go func() { tu := <-subCh out <- tu @@ -114,10 +114,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { require.NoError(t, err) require.False(t, seen, "tipset key should not be seen") - tu := <-out - require.Equal(t, abi.ChainEpoch(14000), tu.Height) - require.EqualValues(t, tsKeyCid, tu.TipsetKeyCid) - require.False(t, tu.Reverted) + _ = <-out testCases := []struct { name string @@ -437,7 +434,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") - tCh := make(chan *EventIndexUpdate, 3) + tCh := make(chan EventIndexUpdated, 3) subCh, unSubscribe := ei.SubscribeUpdates() defer unSubscribe() go func() { @@ -462,20 +459,9 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { require.NoError(t, err, "collect events") } - tu1 := <-tCh - require.EqualValues(t, 14000, tu1.Height) - require.False(t, tu1.Reverted) - require.EqualValues(t, reveredCID14000, tu1.TipsetKeyCid) - - tu2 := <-tCh - require.EqualValues(t, 14000, tu2.Height) - require.True(t, tu2.Reverted) - require.EqualValues(t, reveredCID14000, tu2.TipsetKeyCid) - - tu3 := <-tCh - require.EqualValues(t, 14000, tu3.Height) - require.False(t, tu3.Reverted) - require.EqualValues(t, cid14000, tu3.TipsetKeyCid) + _ = <-tCh + _ = <-tCh + _ = <-tCh inclusiveTestCases := []struct { name string diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index b65494961a6..83334549432 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1313,14 +1313,15 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E } func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi.ChainEpoch) error { - ctx, cancel := context.WithTimeout(ctx, eventReadTimeout) - defer cancel() - + ei := e.EventFilterManager.EventIndex if height > e.Chain.GetHeaviestTipSet().Height() { return xerrors.New("height is in the future") } - ei := e.EventFilterManager.EventIndex + ctx, cancel := context.WithTimeout(ctx, eventReadTimeout) + defer cancel() + + // if the height we're interested in has already been indexed -> there's nothing to do here b, err := ei.IsHeightProcessed(ctx, uint64(height)) if err != nil { return xerrors.Errorf("failed to check if event index has events for given height: %w", err) @@ -1329,13 +1330,27 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi return nil } + // subscribe for updates to the event index subCh, unSubscribeF := ei.SubscribeUpdates() defer unSubscribeF() + // it could be that the event index was update while the subscription was being processed -> check if index has what we need now + b, err = ei.IsHeightProcessed(ctx, uint64(height)) + if err != nil { + return xerrors.Errorf("failed to check if event index has events for given height: %w", err) + } + if b { + return nil + } + for { select { - case tu := <-subCh: - if tu.Height >= height { + case <-subCh: + b, err = ei.IsHeightProcessed(ctx, uint64(height)) + if err != nil { + return xerrors.Errorf("failed to check if event index has events for given height: %w", err) + } + if b { return nil } case <-ctx.Done(): From f5fbaa74c36590ff0de0bea316cc0d5da3d5297e Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 17 Jun 2024 13:52:10 +0400 Subject: [PATCH 14/20] use correct var --- node/impl/full/eth.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 83334549432..a569452e53e 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -46,7 +46,7 @@ const maxEthFeeHistoryRewardPercentiles = 100 var ( // wait for 3 epochs - eventReadTimeout = 90 * time.Second + eventReadTimeout = time.Duration(3 * builtintypes.EpochDurationSeconds) ) type EthModuleAPI interface { From d81803dc31d63336f6be020b6a7543b465951ae3 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 19 Jun 2024 16:10:36 +0400 Subject: [PATCH 15/20] changes as per review --- chain/events/filter/index.go | 56 ++++++++++-------------------------- node/impl/full/eth.go | 5 ++++ 2 files changed, 20 insertions(+), 41 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 3631a7c6860..0b265a9a10c 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -97,7 +97,7 @@ const ( restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?` restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?` - insertEventsSeen = `INSERT OR IGNORE INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, ?)` + upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, ?) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?` getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen` @@ -129,7 +129,7 @@ type EventIndex struct { stmtInsertEntry *sql.Stmt stmtRevertEventsInTipset *sql.Stmt stmtRestoreEvent *sql.Stmt - stmtInsertEventsSeen *sql.Stmt + stmtUpsertEventsSeen *sql.Stmt stmtRevertEventSeen *sql.Stmt stmtRestoreEventSeen *sql.Stmt @@ -175,9 +175,9 @@ func (ei *EventIndex) initStatements() (err error) { return xerrors.Errorf("prepare stmtRestoreEvent: %w", err) } - ei.stmtInsertEventsSeen, err = ei.db.Prepare(insertEventsSeen) + ei.stmtUpsertEventsSeen, err = ei.db.Prepare(upsertEventsSeen) if err != nil { - return xerrors.Errorf("prepare stmtInsertEventsSeen: %w", err) + return xerrors.Errorf("prepare stmtUpsertEventsSeen: %w", err) } ei.stmtRevertEventSeen, err = ei.db.Prepare(revertEventSeen) @@ -498,13 +498,8 @@ func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { // INSERT an entry in the events_seen table for all epochs we do have events for in our DB _, err = tx.ExecContext(ctx, ` - INSERT INTO events_seen (height, tipset_key_cid, reverted) - SELECT e.height, e.tipset_key_cid, e.reverted - FROM event e - WHERE NOT EXISTS ( - SELECT 1 FROM events_seen es - WHERE es.height = e.height AND es.tipset_key_cid = e.tipset_key_cid - ) + INSERT OR IGNORE INTO events_seen (height, tipset_key_cid, reverted) + SELECT DISTINCT height, tipset_key_cid, reverted FROM event `) if err != nil { return xerrors.Errorf("insert events into events_seen: %w", err) @@ -766,7 +761,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } eventCount := 0 - isTipSetRevertRestored := false // iterate over all executed messages in this tipset and insert them into the database if they // don't exist, otherwise mark them as not reverted for msgIdx, em := range ems { @@ -855,41 +849,21 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // this is a sanity check as we should only ever be updating one event if rowsAffected != 1 { log.Warnf("restored %d events but expected only one to exist", rowsAffected) - } else { - isTipSetRevertRestored = true } } eventCount++ } } - if isTipSetRevertRestored { - // restore a revert for the tipset if applicable - res, err := tx.Stmt(ei.stmtRestoreEventSeen).Exec( - te.msgTs.Height(), - tsKeyCid.Bytes(), - ) - if err != nil { - return xerrors.Errorf("exec restore event seen: %w", err) - } - - rowsAffected, err := res.RowsAffected() - if err != nil { - return xerrors.Errorf("error getting rows affected: %s", err) - } - if rowsAffected != 1 { - log.Warnf("restored %d events in events_seen but expected only one to exist", rowsAffected) - } - } else { - // add an entry to the event_seen table for this tipset - _, err = tx.Stmt(ei.stmtInsertEventsSeen).Exec( - te.msgTs.Height(), - tsKeyCid.Bytes(), - false, - ) - if err != nil { - return xerrors.Errorf("exec insert events seen: %w", err) - } + // this statement will mark the tipset as processed and will insert a new row if it doesn't exist + // or update the reverted field to false if it does + _, err = tx.Stmt(ei.stmtUpsertEventsSeen).Exec( + te.msgTs.Height(), + tsKeyCid.Bytes(), + false, + ) + if err != nil { + return xerrors.Errorf("exec upsert events seen: %w", err) } err = tx.Commit() diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index a569452e53e..a3826decba3 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1277,10 +1277,15 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E if maxHeight == -1 { maxHeight = e.Chain.GetHeaviestTipSet().Height() } + if maxHeight > e.Chain.GetHeaviestTipSet().Height() { + return nil, xerrors.Errorf("max height requested is greater than the heaviest tipset") + } + err := e.waitForHeightProcessed(ctx, maxHeight) if err != nil { return nil, err } + } else { ts, err := e.Chain.GetTipSetByCid(ctx, pf.tipsetCid) if err != nil { From 11ba4dd17f62816390f7231b16126656615b0e35 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 20 Jun 2024 08:39:56 +0400 Subject: [PATCH 16/20] add unique constraint --- chain/events/filter/index.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 0b265a9a10c..2b316e9008d 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -114,7 +114,8 @@ const ( id INTEGER PRIMARY KEY, height INTEGER NOT NULL, tipset_key_cid BLOB NOT NULL, - reverted INTEGER NOT NULL + reverted INTEGER NOT NULL, + UNIQUE(height, tipset_key_cid) )` createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);` From f7b8d39d23ee62b42f0883beb74b0d276ef2b176 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 20 Jun 2024 09:41:17 +0400 Subject: [PATCH 17/20] fix test wait --- node/impl/full/eth.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index a3826decba3..c85b04901b2 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -46,7 +46,7 @@ const maxEthFeeHistoryRewardPercentiles = 100 var ( // wait for 3 epochs - eventReadTimeout = time.Duration(3 * builtintypes.EpochDurationSeconds) + eventReadTimeout = 90 * time.Second ) type EthModuleAPI interface { From 306adedcca4b251170d0f9af6035a7219bc24da7 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 20 Jun 2024 10:00:23 +0400 Subject: [PATCH 18/20] check for events at min_height as well --- chain/events/filter/index.go | 16 +++++++++++----- chain/events/filter/index_test.go | 4 ++++ node/impl/full/eth.go | 8 ++++++++ 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 2b316e9008d..4d7de5185f4 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -100,6 +100,7 @@ const ( upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, ?) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?` getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen` + isHeightProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?` createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)` createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);` @@ -136,6 +137,7 @@ type EventIndex struct { stmtIsTipsetProcessed *sql.Stmt stmtGetMaxHeightInIndex *sql.Stmt + stmtIsHeightProcessed *sql.Stmt mu sync.Mutex subIdCounter uint64 @@ -201,6 +203,11 @@ func (ei *EventIndex) initStatements() (err error) { return xerrors.Errorf("prepare getMaxHeightInIndex: %w", err) } + ei.stmtIsHeightProcessed, err = ei.db.Prepare(isHeightProcessed) + if err != nil { + return xerrors.Errorf("prepare isHeightProcessed: %w", err) + } + return nil } @@ -688,11 +695,10 @@ func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) { } func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (bool, error) { - mh, err := ei.GetMaxHeightInIndex(ctx) - if err != nil { - return false, err - } - return height <= mh, nil + row := ei.stmtIsHeightProcessed.QueryRowContext(ctx, height) + var exists bool + err := row.Scan(&exists) + return exists, err } func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) { diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index b9e5c49a05c..10b3eb57779 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -102,6 +102,10 @@ func TestEventIndexPrefillFilter(t *testing.T) { require.NoError(t, err) require.False(t, b) + b, err = ei.IsHeightProcessed(context.Background(), 13000) + require.NoError(t, err) + require.False(t, b) + tsKey := events14000.msgTs.Key() tsKeyCid, err := tsKey.Cid() require.NoError(t, err, "tipset key cid") diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index c85b04901b2..3cbffe84a2d 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1286,6 +1286,14 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E return nil, err } + // should also have the minHeight in the filter indexed + b, err := e.EventFilterManager.EventIndex.IsHeightProcessed(ctx, uint64(pf.minHeight)) + if err != nil { + return nil, xerrors.Errorf("failed to check if event index has events for the minheight: %w", err) + } + if !b { + return nil, xerrors.Errorf("event index does not have event for epoch %d", pf.minHeight) + } } else { ts, err := e.Chain.GetTipSetByCid(ctx, pf.tipsetCid) if err != nil { From 48692188fc0a22a9748fae017d8495d758f0d0ce Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 20 Jun 2024 10:53:33 +0400 Subject: [PATCH 19/20] Apply suggestions from code review Co-authored-by: Rod Vagg --- chain/events/filter/index.go | 3 +-- node/impl/full/eth.go | 10 ++++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 4d7de5185f4..aacd04f15c0 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -97,7 +97,7 @@ const ( restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?` restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?` - upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, ?) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` + upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?` getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen` isHeightProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?` @@ -867,7 +867,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever _, err = tx.Stmt(ei.stmtUpsertEventsSeen).Exec( te.msgTs.Height(), tsKeyCid.Bytes(), - false, ) if err != nil { return xerrors.Errorf("exec upsert events seen: %w", err) diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 3cbffe84a2d..26eb1b72562 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1278,7 +1278,7 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E maxHeight = e.Chain.GetHeaviestTipSet().Height() } if maxHeight > e.Chain.GetHeaviestTipSet().Height() { - return nil, xerrors.Errorf("max height requested is greater than the heaviest tipset") + return nil, xerrors.Errorf("maxHeight requested is greater than the heaviest tipset") } err := e.waitForHeightProcessed(ctx, maxHeight) @@ -1289,7 +1289,7 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E // should also have the minHeight in the filter indexed b, err := e.EventFilterManager.EventIndex.IsHeightProcessed(ctx, uint64(pf.minHeight)) if err != nil { - return nil, xerrors.Errorf("failed to check if event index has events for the minheight: %w", err) + return nil, xerrors.Errorf("failed to check if event index has events for the minHeight: %w", err) } if !b { return nil, xerrors.Errorf("event index does not have event for epoch %d", pf.minHeight) @@ -1335,11 +1335,9 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi defer cancel() // if the height we're interested in has already been indexed -> there's nothing to do here - b, err := ei.IsHeightProcessed(ctx, uint64(height)) - if err != nil { + if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil { return xerrors.Errorf("failed to check if event index has events for given height: %w", err) - } - if b { + } else if b { return nil } From f42a5f4958cbf0e7921b5bbf6b7dcbeb475462bd Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 20 Jun 2024 11:03:54 +0400 Subject: [PATCH 20/20] reduce duplication --- node/impl/full/eth.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 26eb1b72562..27d7002e440 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1287,11 +1287,9 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E } // should also have the minHeight in the filter indexed - b, err := e.EventFilterManager.EventIndex.IsHeightProcessed(ctx, uint64(pf.minHeight)) - if err != nil { + if b, err := e.EventFilterManager.EventIndex.IsHeightProcessed(ctx, uint64(pf.minHeight)); err != nil { return nil, xerrors.Errorf("failed to check if event index has events for the minHeight: %w", err) - } - if !b { + } else if !b { return nil, xerrors.Errorf("event index does not have event for epoch %d", pf.minHeight) } } else { @@ -1346,22 +1344,18 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi defer unSubscribeF() // it could be that the event index was update while the subscription was being processed -> check if index has what we need now - b, err = ei.IsHeightProcessed(ctx, uint64(height)) - if err != nil { + if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil { return xerrors.Errorf("failed to check if event index has events for given height: %w", err) - } - if b { + } else if b { return nil } for { select { case <-subCh: - b, err = ei.IsHeightProcessed(ctx, uint64(height)) - if err != nil { + if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil { return xerrors.Errorf("failed to check if event index has events for given height: %w", err) - } - if b { + } else if b { return nil } case <-ctx.Done():