Skip to content

Commit

Permalink
chore(events): improve perf for parallel event filter matching
Browse files Browse the repository at this point in the history
1. Cache address look-ups for the given tipset across filters
2. Run the filters in an errgroup
  • Loading branch information
rvagg committed Nov 20, 2024
1 parent 2cd6f40 commit 66a83b5
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Lotus changelog

# UNRELEASED
- Improve eth filter performance for nodes serving many clients. ([filecoin-project/lotus#12603](https://github.com/filecoin-project/lotus/pull/12603))

## Bug Fixes

Expand Down
75 changes: 52 additions & 23 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand All @@ -28,7 +29,10 @@ func isIndexedValue(b uint8) bool {
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}

type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool)
// AddressResolver is a function that resolves an actor ID to an address. If the
// actor ID cannot be resolved to an address, the function must return
// address.Undef.
type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) address.Address

type EventFilter interface {
Filter
Expand Down Expand Up @@ -77,9 +81,6 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil
}

// cache of lookups between actor id and f4 address
addressLookups := make(map[abi.ActorID]address.Address)

ems, err := te.messages(ctx)
if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
Expand All @@ -89,16 +90,10 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever

for msgIdx, em := range ems {
for _, ev := range em.Events() {
// lookup address corresponding to the actor id
addr, found := addressLookups[ev.Emitter]
if !found {
var ok bool
addr, ok = resolver(ctx, ev.Emitter, te.rctTs)
if !ok {
// not an address we will be able to match against
continue
}
addressLookups[ev.Emitter] = addr
addr := resolver(ctx, ev.Emitter, te.rctTs)
if addr == address.Undef {
// not an address we will be able to match against
continue
}

if !f.matchAddress(addr) {
Expand Down Expand Up @@ -295,7 +290,7 @@ func (e *executedMessage) Events() []*types.Event {

type EventFilterManager struct {
ChainStore *cstore.ChainStore
AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)
AddressResolver AddressResolver
MaxFilterResults int
ChainIndexer index.Indexer

Expand All @@ -319,11 +314,17 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
load: m.loadExecutedMessages,
}

// TODO: could run this loop in parallel with errgroup if there are many filters
tsAddressResolver := tipSetCachedAddressResolver(m.AddressResolver)

g, ctx := errgroup.WithContext(ctx)
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
return err
}
g.Go(func() error {
return f.CollectEvents(ctx, tse, false, tsAddressResolver)
})
}

if err := g.Wait(); err != nil {
return err
}

return nil
Expand All @@ -344,11 +345,17 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
load: m.loadExecutedMessages,
}

// TODO: could run this loop in parallel with errgroup if there are many filters
tsAddressResolver := tipSetCachedAddressResolver(m.AddressResolver)

g, ctx := errgroup.WithContext(ctx)
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
return err
}
g.Go(func() error {
return f.CollectEvents(ctx, tse, true, tsAddressResolver)
})
}

if err := g.Wait(); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -507,3 +514,25 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc

return ems, nil
}

// tipSetCachedAddressResolver returns a thread-safe function that resolves actor IDs to addresses
// with a cache that is shared across all calls to the returned function. This should only be used
// for a single TipSet, as the resolution may vary across TipSets and the cache does not account for
// this.
func tipSetCachedAddressResolver(resolver AddressResolver) func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
addressLookups := make(map[abi.ActorID]address.Address)
var addressLookupsLk sync.Mutex

return func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
addressLookupsLk.Lock()
defer addressLookupsLk.Unlock()

addr, ok := addressLookups[emitter]
if !ok {
addr = resolver(ctx, emitter, ts)
addressLookups[emitter] = addr
}

return addr
}
}
7 changes: 5 additions & 2 deletions chain/events/filter/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,10 @@ func (a addressMap) add(actorID abi.ActorID, addr address.Address) {
a[actorID] = addr
}

func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
ra, ok := a[emitter]
return ra, ok
if ok {
return ra
}
return address.Undef
}
10 changes: 4 additions & 6 deletions node/modules/actorevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,18 @@ func EventFilterManager(cfg config.EventsConfig) func(helpers.MetricsCtx, repo.L
fm := &filter.EventFilterManager{
ChainStore: cs,
ChainIndexer: ci,
// TODO:
// We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
return address.Undef
}

actor, err := sm.LoadActor(ctx, idAddr, ts)
if err != nil || actor.DelegatedAddress == nil {
return idAddr, true
return idAddr
}

return *actor.DelegatedAddress, true
return *actor.DelegatedAddress
},

MaxFilterResults: cfg.MaxFilterResults,
Expand Down

0 comments on commit 66a83b5

Please sign in to comment.