From f5951fcef3fd1caaf7eb15ecf1b05e5e0ddc2fd5 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Wed, 16 Oct 2024 12:58:46 +1100 Subject: [PATCH] chore(events): improve perf for parallel event filter matching 1. Cache address look-ups for the given tipset across filters 2. Run the filters in an errgroup --- CHANGELOG.md | 1 + chain/events/filter/event.go | 75 +++++++++++++++++++++---------- chain/events/filter/event_test.go | 7 ++- node/modules/actorevent.go | 10 ++--- 4 files changed, 62 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6159f3c1aa6..3eeb44b7dd5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Lotus changelog # UNRELEASED +- Improve eth filter performance for nodes serving many clients. ([filecoin-project/lotus#12603](https://github.com/filecoin-project/lotus/pull/12603)) # Node and Miner v1.31.0-rc1 / 2024-11-12 diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index ccf3bd0c941..b0233d33ab3 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -28,7 +29,10 @@ func isIndexedValue(b uint8) bool { return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0 } -type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool) +// AddressResolver is a function that resolves an actor ID to an address. If the +// actor ID cannot be resolved to an address, the function must return +// address.Undef. +type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) address.Address type EventFilter interface { Filter @@ -77,9 +81,6 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever return nil } - // cache of lookups between actor id and f4 address - addressLookups := make(map[abi.ActorID]address.Address) - ems, err := te.messages(ctx) if err != nil { return xerrors.Errorf("load executed messages: %w", err) @@ -89,16 +90,10 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever for msgIdx, em := range ems { for _, ev := range em.Events() { - // lookup address corresponding to the actor id - addr, found := addressLookups[ev.Emitter] - if !found { - var ok bool - addr, ok = resolver(ctx, ev.Emitter, te.rctTs) - if !ok { - // not an address we will be able to match against - continue - } - addressLookups[ev.Emitter] = addr + addr := resolver(ctx, ev.Emitter, te.rctTs) + if addr == address.Undef { + // not an address we will be able to match against + continue } if !f.matchAddress(addr) { @@ -295,7 +290,7 @@ func (e *executedMessage) Events() []*types.Event { type EventFilterManager struct { ChainStore *cstore.ChainStore - AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) + AddressResolver AddressResolver MaxFilterResults int ChainIndexer index.Indexer @@ -319,11 +314,17 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet) load: m.loadExecutedMessages, } - // TODO: could run this loop in parallel with errgroup if there are many filters + tsAddressResolver := tipSetCachedAddressResolver(m.AddressResolver) + + g, ctx := errgroup.WithContext(ctx) for _, f := range m.filters { - if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil { - return err - } + g.Go(func() error { + return f.CollectEvents(ctx, tse, false, tsAddressResolver) + }) + } + + if err := g.Wait(); err != nil { + return err } return nil @@ -344,11 +345,17 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) load: m.loadExecutedMessages, } - // TODO: could run this loop in parallel with errgroup if there are many filters + tsAddressResolver := tipSetCachedAddressResolver(m.AddressResolver) + + g, ctx := errgroup.WithContext(ctx) for _, f := range m.filters { - if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil { - return err - } + g.Go(func() error { + return f.CollectEvents(ctx, tse, true, tsAddressResolver) + }) + } + + if err := g.Wait(); err != nil { + return err } return nil @@ -507,3 +514,25 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc return ems, nil } + +// tipSetCachedAddressResolver returns a thread-safe function that resolves actor IDs to addresses +// with a cache that is shared across all calls to the returned function. This should only be used +// for a single TipSet, as the resolution may vary across TipSets and the cache does not account for +// this. +func tipSetCachedAddressResolver(resolver AddressResolver) func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address { + addressLookups := make(map[abi.ActorID]address.Address) + var addressLookupsLk sync.Mutex + + return func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address { + addressLookupsLk.Lock() + defer addressLookupsLk.Unlock() + + addr, ok := addressLookups[emitter] + if !ok { + addr = resolver(ctx, emitter, ts) + addressLookups[emitter] = addr + } + + return addr + } +} diff --git a/chain/events/filter/event_test.go b/chain/events/filter/event_test.go index 5ffb678c65e..aab887b5b15 100644 --- a/chain/events/filter/event_test.go +++ b/chain/events/filter/event_test.go @@ -441,7 +441,10 @@ func (a addressMap) add(actorID abi.ActorID, addr address.Address) { a[actorID] = addr } -func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { +func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address { ra, ok := a[emitter] - return ra, ok + if ok { + return ra + } + return address.Undef } diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index a77ae271a63..5fa02205e9d 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -105,20 +105,18 @@ func EventFilterManager(cfg config.EventsConfig) func(helpers.MetricsCtx, repo.L fm := &filter.EventFilterManager{ ChainStore: cs, ChainIndexer: ci, - // TODO: - // We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands - AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { + AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address { idAddr, err := address.NewIDAddress(uint64(emitter)) if err != nil { - return address.Undef, false + return address.Undef } actor, err := sm.LoadActor(ctx, idAddr, ts) if err != nil || actor.DelegatedAddress == nil { - return idAddr, true + return idAddr } - return *actor.DelegatedAddress, true + return *actor.DelegatedAddress }, MaxFilterResults: cfg.MaxFilterResults,