Skip to content

Commit

Permalink
More post-review changes, lots of tests for SubscribeActorEvents
Browse files Browse the repository at this point in the history
Use BlockDelay as the window for receiving events on the SubscribeActorEvents
channel. We expect the user to have received the initial batch of historical
events (if any) in one block's time. For real-time events we expect them to
not fall behind by roughly one block's time.
  • Loading branch information
rvagg committed Mar 1, 2024
1 parent 3120f00 commit 8062c1d
Show file tree
Hide file tree
Showing 9 changed files with 927 additions and 216 deletions.
43 changes: 26 additions & 17 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@ func isIndexedValue(b uint8) bool {
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}

type EventFilter struct {
type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool)

type EventFilter interface {
Filter

TakeCollectedEvents(context.Context) []*CollectedEvent
CollectEvents(context.Context, *TipSetEvents, bool, AddressResolver) error
}

type eventFilter struct {
id types.FilterID
minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum
maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum
Expand All @@ -43,7 +52,7 @@ type EventFilter struct {
ch chan<- interface{}
}

var _ Filter = (*EventFilter)(nil)
var _ Filter = (*eventFilter)(nil)

type CollectedEvent struct {
Entries []types.EventEntry
Expand All @@ -56,24 +65,24 @@ type CollectedEvent struct {
MsgCid cid.Cid // cid of message that produced event
}

func (f *EventFilter) ID() types.FilterID {
func (f *eventFilter) ID() types.FilterID {
return f.id
}

func (f *EventFilter) SetSubChannel(ch chan<- interface{}) {
func (f *eventFilter) SetSubChannel(ch chan<- interface{}) {
f.mu.Lock()
defer f.mu.Unlock()
f.ch = ch
f.collected = nil
}

func (f *EventFilter) ClearSubChannel() {
func (f *eventFilter) ClearSubChannel() {
f.mu.Lock()
defer f.mu.Unlock()
f.ch = nil
}

func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver AddressResolver) error {
if !f.matchTipset(te) {
return nil
}
Expand Down Expand Up @@ -138,13 +147,13 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil
}

func (f *EventFilter) setCollectedEvents(ces []*CollectedEvent) {
func (f *eventFilter) setCollectedEvents(ces []*CollectedEvent) {
f.mu.Lock()
f.collected = ces
f.mu.Unlock()
}

func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent {
func (f *eventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent {
f.mu.Lock()
collected := f.collected
f.collected = nil
Expand All @@ -154,14 +163,14 @@ func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent
return collected
}

func (f *EventFilter) LastTaken() time.Time {
func (f *eventFilter) LastTaken() time.Time {
f.mu.Lock()
defer f.mu.Unlock()
return f.lastTaken
}

// matchTipset reports whether this filter matches the given tipset
func (f *EventFilter) matchTipset(te *TipSetEvents) bool {
func (f *eventFilter) matchTipset(te *TipSetEvents) bool {
if f.tipsetCid != cid.Undef {
tsCid, err := te.Cid()
if err != nil {
Expand All @@ -179,7 +188,7 @@ func (f *EventFilter) matchTipset(te *TipSetEvents) bool {
return true
}

func (f *EventFilter) matchAddress(o address.Address) bool {
func (f *eventFilter) matchAddress(o address.Address) bool {
if len(f.addresses) == 0 {
return true
}
Expand All @@ -194,7 +203,7 @@ func (f *EventFilter) matchAddress(o address.Address) bool {
return false
}

func (f *EventFilter) matchKeys(ees []types.EventEntry) bool {
func (f *eventFilter) matchKeys(ees []types.EventEntry) bool {
if len(f.keysWithCodec) == 0 {
return true
}
Expand Down Expand Up @@ -297,7 +306,7 @@ type EventFilterManager struct {
EventIndex *EventIndex

mu sync.Mutex // guards mutations to filters
filters map[types.FilterID]*EventFilter
filters map[types.FilterID]EventFilter
currentHeight abi.ChainEpoch
}

Expand Down Expand Up @@ -364,7 +373,7 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
}

func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (*EventFilter, error) {
keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) {
m.mu.Lock()
currentHeight := m.currentHeight
m.mu.Unlock()
Expand All @@ -378,7 +387,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
return nil, xerrors.Errorf("new filter id: %w", err)
}

f := &EventFilter{
f := &eventFilter{
id: id,
minHeight: minHeight,
maxHeight: maxHeight,
Expand All @@ -390,14 +399,14 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a

if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight {
// Filter needs historic events
if err := m.EventIndex.PrefillFilter(ctx, f, excludeReverted); err != nil {
if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil {
return nil, err
}
}

m.mu.Lock()
if m.filters == nil {
m.filters = make(map[types.FilterID]*EventFilter)
m.filters = make(map[types.FilterID]EventFilter)
}
m.filters[id] = f
m.mu.Unlock()
Expand Down
30 changes: 15 additions & 15 deletions chain/events/filter/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ func TestEventFilterCollectEvents(t *testing.T) {

testCases := []struct {
name string
filter *EventFilter
filter *eventFilter
te *TipSetEvents
want []*CollectedEvent
}{
{
name: "nomatch tipset min height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: 14001,
maxHeight: -1,
},
Expand All @@ -101,7 +101,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch tipset max height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: 13999,
},
Expand All @@ -110,7 +110,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match tipset min height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: 14000,
maxHeight: -1,
},
Expand All @@ -119,7 +119,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match tipset cid",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
tipsetCid: cid14000,
Expand All @@ -129,7 +129,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch address",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a2},
Expand All @@ -139,7 +139,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match address",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a1},
Expand All @@ -149,7 +149,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match one entry",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
Expand All @@ -163,7 +163,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match one entry with alternate values",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
Expand All @@ -179,7 +179,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch one entry by missing value",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
Expand All @@ -194,7 +194,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch one entry by missing key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
Expand All @@ -208,7 +208,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match one entry with multiple keys",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
Expand All @@ -225,7 +225,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch one entry with one mismatching key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
Expand All @@ -242,7 +242,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch one entry with one mismatching value",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
Expand All @@ -259,7 +259,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch one entry with one unindexed key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
Expand Down
2 changes: 1 addition & 1 deletion chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
}

// PrefillFilter fills a filter's collection of events from the historic index
func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter, excludeReverted bool) error {
func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error {
clauses := []string{}
values := []any{}
joins := []string{}
Expand Down
Loading

0 comments on commit 8062c1d

Please sign in to comment.