Skip to content

Commit

Permalink
fix: atomically get head when registering an observer
Browse files Browse the repository at this point in the history
This lets us always call check (accurately).
  • Loading branch information
Stebalien committed Aug 20, 2021
1 parent 09da953 commit bd3f313
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 62 deletions.
10 changes: 3 additions & 7 deletions chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,13 @@ func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi
cache := newCache(api, gcConfidence)

ob := newObserver(cache, gcConfidence)
he := newHeightEvents(cache, gcConfidence)
headChange := newHCEvents(cache)

// Cache first. Observers are ordered and we always want to fill the cache first.
ob.Observe(cache.observer())
ob.Observe(he.observer())
ob.Observe(headChange.observer())
if err := ob.start(ctx); err != nil {
return nil, err
}

he := newHeightEvents(cache, ob, gcConfidence)
headChange := newHCEvents(cache, ob)

return &Events{ob, he, headChange}, nil
}

Expand Down
25 changes: 10 additions & 15 deletions chain/events/events_called.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ type queuedEvent struct {
type hcEvents struct {
cs EventAPI

lk sync.Mutex
lastTs *types.TipSet

lk sync.Mutex

ctr triggerID

// TODO: get rid of trigger IDs and just use pointers as keys.
Expand All @@ -93,7 +92,7 @@ type hcEvents struct {
watcherEvents
}

func newHCEvents(api EventAPI) *hcEvents {
func newHCEvents(api EventAPI, obs *observer) *hcEvents {
e := &hcEvents{
cs: api,
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
Expand All @@ -105,15 +104,16 @@ func newHCEvents(api EventAPI) *hcEvents {
e.messageEvents = newMessageEvents(e, api)
e.watcherEvents = newWatcherEvents(e, api)

// We need to take the lock as the observer could immediately try calling us.
e.lk.Lock()
e.lastTs = obs.Observe((*hcEventsObserver)(e))
e.lk.Unlock()

return e
}

type hcEventsObserver hcEvents

func (e *hcEvents) observer() TipSetObserver {
return (*hcEventsObserver)(e)
}

func (e *hcEventsObserver) Apply(ctx context.Context, from, to *types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
Expand Down Expand Up @@ -284,14 +284,9 @@ func (e *hcEvents) onHeadChanged(ctx context.Context, check CheckFunc, hnd Event
defer e.lk.Unlock()

// Check if the event has already occurred
more := true
done := false
if e.lastTs != nil {
var err error
done, more, err = check(ctx, e.lastTs)
if err != nil {
return 0, xerrors.Errorf("called check error (h: %d): %w", e.lastTs.Height(), err)
}
done, more, err := check(ctx, e.lastTs)
if err != nil {
return 0, xerrors.Errorf("called check error (h: %d): %w", e.lastTs.Height(), err)
}
if done {
timeout = NoTimeout
Expand Down
21 changes: 6 additions & 15 deletions chain/events/events_height.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ type heightEvents struct {
lastGc abi.ChainEpoch //nolint:structcheck
}

func newHeightEvents(api EventAPI, gcConfidence abi.ChainEpoch) *heightEvents {
return &heightEvents{
func newHeightEvents(api EventAPI, obs *observer, gcConfidence abi.ChainEpoch) *heightEvents {
he := &heightEvents{
api: api,
gcConfidence: gcConfidence,
tsHeights: map[abi.ChainEpoch][]*heightHandler{},
triggerHeights: map[abi.ChainEpoch][]*heightHandler{},
}
he.lk.Lock()
he.head = obs.Observe((*heightEventsObserver)(he))
he.lk.Unlock()
return he
}

// ChainAt invokes the specified `HeightHandler` when the chain reaches the
Expand Down Expand Up @@ -69,15 +73,6 @@ func (e *heightEvents) ChainAt(ctx context.Context, hnd HeightHandler, rev Rever
e.lk.Lock()
for {
head := e.head

// If we haven't initialized yet, store the trigger and move on.
if head == nil {
e.triggerHeights[triggerAt] = append(e.triggerHeights[triggerAt], handler)
e.tsHeights[h] = append(e.tsHeights[h], handler)
e.lk.Unlock()
return nil
}

if head.Height() >= h {
// Head is past the handler height. We at least need to stash the tipset to
// avoid doing this from the main event loop.
Expand Down Expand Up @@ -152,10 +147,6 @@ func (e *heightEvents) ChainAt(ctx context.Context, hnd HeightHandler, rev Rever
}
}

func (e *heightEvents) observer() TipSetObserver {
return (*heightEventsObserver)(e)
}

// Updates the head and garbage collects if we're 2x over our garbage collection confidence period.
func (e *heightEventsObserver) updateHead(h *types.TipSet) {
e.lk.Lock()
Expand Down
4 changes: 0 additions & 4 deletions chain/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,8 +875,6 @@ func TestCalledTimeout(t *testing.T) {
events, err = NewEvents(context.Background(), fcs)
require.NoError(t, err)

fcs.advance(0, 1, 0, nil)

err = events.Called(context.Background(), func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
Expand Down Expand Up @@ -1298,8 +1296,6 @@ func TestStateChangedTimeout(t *testing.T) {
events, err = NewEvents(context.Background(), fcs)
require.NoError(t, err)

fcs.advance(0, 1, 0, nil)

err = events.StateChanged(func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
Expand Down
63 changes: 42 additions & 21 deletions chain/events/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,26 @@ import (
type observer struct {
api EventAPI

lk sync.Mutex
gcConfidence abi.ChainEpoch

ready chan struct{}

lk sync.Mutex
head *types.TipSet
maxHeight abi.ChainEpoch

observers []TipSetObserver
}

func newObserver(api EventAPI, gcConfidence abi.ChainEpoch) *observer {
return &observer{
func newObserver(api *cache, gcConfidence abi.ChainEpoch) *observer {
obs := &observer{
api: api,
gcConfidence: gcConfidence,

ready: make(chan struct{}),
observers: []TipSetObserver{},
}
obs.Observe(api.observer())
return obs
}

func (o *observer) start(ctx context.Context) error {
Expand Down Expand Up @@ -100,12 +101,18 @@ func (o *observer) listenHeadChangesOnce(ctx context.Context) error {
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
}

head := cur[0].Val
curHead := cur[0].Val

o.lk.Lock()
if o.head == nil {
o.head = head
o.head = curHead
close(o.ready)
} else if !o.head.Equals(head) {
changes, err := o.api.ChainGetPath(ctx, o.head.Key(), head.Key())
}
startHead := o.head
o.lk.Unlock()

if !startHead.Equals(curHead) {
changes, err := o.api.ChainGetPath(ctx, startHead.Key(), curHead.Key())
if err != nil {
return xerrors.Errorf("failed to get path from last applied tipset to head: %w", err)
}
Expand Down Expand Up @@ -152,26 +159,31 @@ func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) err
ctx, span := trace.StartSpan(ctx, "events.HeadChange")
span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev))))
span.AddAttributes(trace.Int64Attribute("applies", int64(len(app))))

o.lk.Lock()
head := o.head
o.lk.Unlock()

defer func() {
span.AddAttributes(trace.Int64Attribute("endHeight", int64(o.head.Height())))
span.AddAttributes(trace.Int64Attribute("endHeight", int64(head.Height())))
span.End()
}()

// NOTE: bailing out here if the head isn't what we expected is fine. We'll re-start the
// entire process and handle any strange reorgs.
for i, from := range rev {
if !from.Equals(o.head) {
if !from.Equals(head) {
return xerrors.Errorf(
"expected to revert %s (%d), reverting %s (%d)",
o.head.Key(), o.head.Height(), from.Key(), from.Height(),
head.Key(), head.Height(), from.Key(), from.Height(),
)
}
var to *types.TipSet
if i+1 < len(rev) {
// If we have more reverts, the next revert is the next head.
to = rev[i+1]
} else {
// At the end of the revert sequenece, we need to looup the joint tipset
// At the end of the revert sequenece, we need to lookup the joint tipset
// between the revert sequence and the apply sequence.
var err error
to, err = o.api.ChainGetTipSet(ctx, from.Parents())
Expand All @@ -181,9 +193,14 @@ func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) err
}
}

// Get the observers late in case an observer registers/unregisters itself.
// Get the current observers and atomically set the head.
//
// 1. We need to get the observers every time in case some registered/deregistered.
// 2. We need to atomically set the head so new observers don't see events twice or
// skip them.
o.lk.Lock()
observers := o.observers
o.head = to
o.lk.Unlock()

for _, obs := range observers {
Expand All @@ -196,39 +213,43 @@ func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) err
log.Errorf("reverted past finality, from %d to %d", o.maxHeight, to.Height())
}

o.head = to
head = to
}

for _, to := range app {
if to.Parents() != o.head.Key() {
if to.Parents() != head.Key() {
return xerrors.Errorf(
"cannot apply %s (%d) with parents %s on top of %s (%d)",
to.Key(), to.Height(), to.Parents(), o.head.Key(), o.head.Height(),
to.Key(), to.Height(), to.Parents(), head.Key(), head.Height(),
)
}

// Get the observers late in case an observer registers/unregisters itself.
o.lk.Lock()
observers := o.observers
o.head = to
o.lk.Unlock()

for _, obs := range observers {
if err := obs.Apply(ctx, o.head, to); err != nil {
if err := obs.Apply(ctx, head, to); err != nil {
log.Errorf("observer %T failed to revert tipset %s (%d) with: %s", obs, to.Key(), to.Height(), err)
}
}
o.head = to
if to.Height() > o.maxHeight {
o.maxHeight = to.Height()
}

head = to
}
return nil
}

// TODO: add a confidence level so we can have observers with difference levels of confidence
func (o *observer) Observe(obs TipSetObserver) {
// Observe registers the observer, and returns the current tipset. The observer is guaranteed to
// observe events starting at this tipset.
//
// Returns nil if the observer hasn't started yet (but still registers).
func (o *observer) Observe(obs TipSetObserver) *types.TipSet {
o.lk.Lock()
defer o.lk.Unlock()
o.observers = append(o.observers, obs)
return o.head
}

0 comments on commit bd3f313

Please sign in to comment.