From 6501c3a9816e69566015f8de3319a989e6155e06 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Sat, 14 Sep 2024 21:55:30 +0400 Subject: [PATCH 1/3] auto repair events --- chain/index/events.go | 16 +++++++++++++++- chain/index/indexer.go | 6 ++++++ chain/index/interface.go | 1 + node/modules/chainindex.go | 5 +++++ 4 files changed, 27 insertions(+), 1 deletion(-) diff --git a/chain/index/events.go b/chain/index/events.go index 6000ef348a..2ddf0a6d32 100644 --- a/chain/index/events.go +++ b/chain/index/events.go @@ -163,7 +163,21 @@ func (si *SqliteIndexer) loadExecutedMessages(ctx context.Context, msgTs, rctTs eventsArr, err := amt4.LoadAMT(ctx, st, *rct.EventsRoot, amt4.UseTreeBitWidth(types.EventAMTBitwidth)) if err != nil { - return nil, xerrors.Errorf("error loading events amt: %w", err) + if si.tipsetExecutorFnc == nil { + return nil, xerrors.Errorf("failed to load events amt for message %s: %w", ems[i].msg.Cid(), err) + } + log.Warnf("failed to load events amt for message %s: %s; recomputing tipset state to regenerate events", ems[i].msg.Cid(), err) + + _, _, err = si.tipsetExecutorFnc(ctx, msgTs) + if err != nil { + return nil, xerrors.Errorf("failed to recompute missing events; failed to recompute tipset state: %w", err) + } + + eventsArr, err = amt4.LoadAMT(ctx, st, *rct.EventsRoot, amt4.UseTreeBitWidth(types.EventAMTBitwidth)) + if err != nil { + return nil, xerrors.Errorf("failed to load events amt for message %s: %w", ems[i].msg.Cid(), err) + } + log.Infof("successfully recomputed tipset state and loaded events amt for message %s", ems[i].msg.Cid()) } ems[i].evs = make([]types.Event, eventsArr.Len()) diff --git a/chain/index/indexer.go b/chain/index/indexer.go index c4257527ea..afe3ca7094 100644 --- a/chain/index/indexer.go +++ b/chain/index/indexer.go @@ -21,6 +21,7 @@ var _ Indexer = (*SqliteIndexer)(nil) // IdToRobustAddrFunc is a function type that resolves an actor ID to a robust address type IdToRobustAddrFunc func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) +type tipsetExecutorFnc func(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) type preparedStatements struct { insertEthTxHashStmt *sql.Stmt @@ -53,6 +54,7 @@ type SqliteIndexer struct { cs ChainStore idToRobustAddrFunc IdToRobustAddrFunc + tipsetExecutorFnc tipsetExecutorFnc stmts *preparedStatements @@ -120,6 +122,10 @@ func (si *SqliteIndexer) SetIdToRobustAddrFunc(idToRobustAddrFunc IdToRobustAddr si.idToRobustAddrFunc = idToRobustAddrFunc } +func (si *SqliteIndexer) SetTipsetExecutorFnc(tipsetExecutorFnc tipsetExecutorFnc) { + si.tipsetExecutorFnc = tipsetExecutorFnc +} + func (si *SqliteIndexer) Close() error { si.closeLk.Lock() defer si.closeLk.Unlock() diff --git a/chain/index/interface.go b/chain/index/interface.go index 8e695d95d3..7fc21e5e7a 100644 --- a/chain/index/interface.go +++ b/chain/index/interface.go @@ -56,6 +56,7 @@ type Indexer interface { IndexEthTxHash(ctx context.Context, txHash ethtypes.EthHash, c cid.Cid) error SetIdToRobustAddrFunc(idToRobustAddrFunc IdToRobustAddrFunc) + SetTipsetExecutorFnc(tipsetExecutorFnc tipsetExecutorFnc) Apply(ctx context.Context, from, to *types.TipSet) error Revert(ctx context.Context, from, to *types.TipSet) error diff --git a/node/modules/chainindex.go b/node/modules/chainindex.go index 3b8077c072..9ca8da86e6 100644 --- a/node/modules/chainindex.go +++ b/node/modules/chainindex.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/index" @@ -70,6 +71,10 @@ func InitChainIndexer(lc fx.Lifecycle, mctx helpers.MetricsCtx, indexer index.In return *actor.DelegatedAddress, true }) + indexer.SetTipsetExecutorFnc(func(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) { + return sm.RecomputeTipSetState(ctx, ts) + }) + ch, err := mp.Updates(ctx) if err != nil { return err From bd7a132feed488e3b8caa46737aeaded4e181f9c Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 16 Sep 2024 10:07:22 +0400 Subject: [PATCH 2/3] make jen --- node/modules/chainindex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/modules/chainindex.go b/node/modules/chainindex.go index 9ca8da86e6..c48df5c33e 100644 --- a/node/modules/chainindex.go +++ b/node/modules/chainindex.go @@ -4,12 +4,12 @@ import ( "context" "path/filepath" + "github.com/ipfs/go-cid" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" - "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/index" From 3070eafa26de1239e0a8e3980b87c704a1103845 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 16 Sep 2024 12:18:06 +0400 Subject: [PATCH 3/3] fix leaky abstraction --- chain/index/events.go | 5 ++--- chain/index/indexer.go | 10 +++++----- chain/index/interface.go | 2 +- node/modules/chainindex.go | 6 +++--- 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/chain/index/events.go b/chain/index/events.go index 2ddf0a6d32..644f5d9e54 100644 --- a/chain/index/events.go +++ b/chain/index/events.go @@ -163,13 +163,12 @@ func (si *SqliteIndexer) loadExecutedMessages(ctx context.Context, msgTs, rctTs eventsArr, err := amt4.LoadAMT(ctx, st, *rct.EventsRoot, amt4.UseTreeBitWidth(types.EventAMTBitwidth)) if err != nil { - if si.tipsetExecutorFnc == nil { + if si.recomputeTipSetStateFunc == nil { return nil, xerrors.Errorf("failed to load events amt for message %s: %w", ems[i].msg.Cid(), err) } log.Warnf("failed to load events amt for message %s: %s; recomputing tipset state to regenerate events", ems[i].msg.Cid(), err) - _, _, err = si.tipsetExecutorFnc(ctx, msgTs) - if err != nil { + if err := si.recomputeTipSetStateFunc(ctx, msgTs); err != nil { return nil, xerrors.Errorf("failed to recompute missing events; failed to recompute tipset state: %w", err) } diff --git a/chain/index/indexer.go b/chain/index/indexer.go index afe3ca7094..dc2c9be246 100644 --- a/chain/index/indexer.go +++ b/chain/index/indexer.go @@ -21,7 +21,7 @@ var _ Indexer = (*SqliteIndexer)(nil) // IdToRobustAddrFunc is a function type that resolves an actor ID to a robust address type IdToRobustAddrFunc func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) -type tipsetExecutorFnc func(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) +type recomputeTipSetStateFunc func(ctx context.Context, ts *types.TipSet) error type preparedStatements struct { insertEthTxHashStmt *sql.Stmt @@ -53,8 +53,8 @@ type SqliteIndexer struct { db *sql.DB cs ChainStore - idToRobustAddrFunc IdToRobustAddrFunc - tipsetExecutorFnc tipsetExecutorFnc + idToRobustAddrFunc IdToRobustAddrFunc + recomputeTipSetStateFunc recomputeTipSetStateFunc stmts *preparedStatements @@ -122,8 +122,8 @@ func (si *SqliteIndexer) SetIdToRobustAddrFunc(idToRobustAddrFunc IdToRobustAddr si.idToRobustAddrFunc = idToRobustAddrFunc } -func (si *SqliteIndexer) SetTipsetExecutorFnc(tipsetExecutorFnc tipsetExecutorFnc) { - si.tipsetExecutorFnc = tipsetExecutorFnc +func (si *SqliteIndexer) SetRecomputeTipSetStateFunc(recomputeTipSetStateFunc recomputeTipSetStateFunc) { + si.recomputeTipSetStateFunc = recomputeTipSetStateFunc } func (si *SqliteIndexer) Close() error { diff --git a/chain/index/interface.go b/chain/index/interface.go index 7fc21e5e7a..67b43f3be4 100644 --- a/chain/index/interface.go +++ b/chain/index/interface.go @@ -56,7 +56,7 @@ type Indexer interface { IndexEthTxHash(ctx context.Context, txHash ethtypes.EthHash, c cid.Cid) error SetIdToRobustAddrFunc(idToRobustAddrFunc IdToRobustAddrFunc) - SetTipsetExecutorFnc(tipsetExecutorFnc tipsetExecutorFnc) + SetRecomputeTipSetStateFunc(recomputeTipSetStateFunc recomputeTipSetStateFunc) Apply(ctx context.Context, from, to *types.TipSet) error Revert(ctx context.Context, from, to *types.TipSet) error diff --git a/node/modules/chainindex.go b/node/modules/chainindex.go index c48df5c33e..94a366f17f 100644 --- a/node/modules/chainindex.go +++ b/node/modules/chainindex.go @@ -4,7 +4,6 @@ import ( "context" "path/filepath" - "github.com/ipfs/go-cid" "go.uber.org/fx" "golang.org/x/xerrors" @@ -71,8 +70,9 @@ func InitChainIndexer(lc fx.Lifecycle, mctx helpers.MetricsCtx, indexer index.In return *actor.DelegatedAddress, true }) - indexer.SetTipsetExecutorFnc(func(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) { - return sm.RecomputeTipSetState(ctx, ts) + indexer.SetRecomputeTipSetStateFunc(func(ctx context.Context, ts *types.TipSet) error { + _, _, err := sm.RecomputeTipSetState(ctx, ts) + return err }) ch, err := mp.Updates(ctx)