-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(dscache): Fill dscache when saving, using event.Bus. #1133
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exciting stuff! I'd like to make sure we get off on the right foot with the event bus. Locking down the intended uses for events is the best way to avoid event nightmares IMHO.
event/ds_change.go
Outdated
"github.com/qri-io/dataset" | ||
) | ||
|
||
var ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these be const
?
event/ds_change.go
Outdated
|
||
var ( | ||
// ETDatasetInit is for events that initialize datasets | ||
ETDatasetInit = Topic("ds:init") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should namespace events according to the package that emits them. This would be
ETLogbookDatasetNameInitialized = Topic("logbook:DatasetNameInitialized")
Given that events are this super loose thing, I think it's appropriate to use long, verbose names describing exactly what they're doing, including a tense. A general form:
package_name ':' MethodNameWithTense
dscache/dscache.go
Outdated
// Subscribe watches a logbook for actions that change the state | ||
func (d *Dscache) Subscribe(bus event.Bus) { | ||
d.Messages = bus.Subscribe(event.ETDatasetInit, event.ETDatasetChange) | ||
go func() { | ||
for { | ||
e := <-d.Messages | ||
if dsChange, ok := e.Payload.(event.DatasetChangeEvent); ok { | ||
if e.Topic == event.ETDatasetInit { | ||
if err := d.updateInitDataset(dsChange); err != nil { | ||
log.Error(err) | ||
} | ||
} else if e.Topic == event.ETDatasetChange { | ||
if err := d.updateMoveCursor(dsChange); err != nil { | ||
log.Error(err) | ||
} | ||
} | ||
} | ||
} | ||
}() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok lots to talk about here. Here's how I think we should refactor:
// We need to not leak goroutines. The context passed into the `New` constructor
// needs to be plumbed into here and cleaned up if the passed-in context is
// cancelled.
func (d *Dscache) subscribe(ctx context.Context, bus event.Bus) {
// Subsystems that are aware of events should not have an event channel field.
// This allows code that breaks the encapsulation of the `subscribe` method.
// Instead `subscribe` should use a local variable.
eventsCh := bus.Subscribe(event.ETDatasetInit, event.ETDatasetChange)
go func() {
for {
select {
// The statement that listens for events should use the "commma ok"
// pattern to check for a closed channel. If the event bus context is
// cancelled before this goroutine exits, the event bus should close
// listener channels. Breaking on !ok will gracefully close
case e, ok := <-eventsCh:
if !ok {
break
}
if dsChange, ok := e.Payload.(event.DatasetChangeEvent); ok {
if e.Topic == event.ETDatasetInit {
if err := d.updateInitDataset(dsChange); err != nil {
log.Error(err)
}
} else if e.Topic == event.ETDatasetChange {
if err := d.updateMoveCursor(dsChange); err != nil {
log.Error(err)
}
}
}
case <-ctx.Done():
// TODO (b5): add unsubscribe method to bus interface, the arg for
// Unsubscribe should be the channel variable itself, which will be
// available in this scope:
// bus.Unsubscribe(eventsCh)
break
}
}()
}
}
The general pattern I like to see emerge is a private subscribe
method that encapsulates all event handling for a subsystem. We should use the same private method name across all subsystems for clarity. subscribe
should always be called within the constructor and listen to the context given to the constructor. Passing a non-nil event bus to a constructor should always subscribe, cutting down on potential bugs caused by requiring package consumers to both construct & subscribe.
Finally, I think this is a good enough reason to add an Unsubscribe
method to the bus interface, and call it if the context is cancelled. Currently we can get away with this because in all (production) cases the context given to Dscache
will live for the same duration as the bus itself. But this might not always be the case (I'm thinking of other subsystems like cron
). Having all subscribers clean up on close means we won't have to rely on assumption of aligned context duration. I'd be happy to PR in the Unsubscribe method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all makes sense, added as described
dscache/dscache.go
Outdated
ProfileIDToUsername map[string]string | ||
Messages <-chan event.Event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's drop this field, see big Subscribe
note
logbook/logbook.go
Outdated
@@ -92,6 +93,7 @@ type Book struct { | |||
pk crypto.PrivKey | |||
authorID string | |||
authorName string | |||
bus event.Bus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a fan of this. We may want to consider adding a Publisher
interface to the event
package that only has the publish method, and make this a publisher field. Setup would be the same, just would add extra assurances that people can't subscribe in weird places.
We could also define a NilPublisher
implementation that is nil-callable, which would drop the need for
if book.bus !=nil {
every time we want publish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added both
Addressed your comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much! As soon as CI passes, let's get this merged
Oh, just realized event.Bus is making these tests non-deterministic because the dscache may render before the event is fully handled. Also, probably need to put a mutex around the dscache update handler. I'll have to fix these tomorrow, then will merge. |
a2d02cc
to
31e0a94
Compare
Logbook informs event.Bus of dataset inits and saves. Init events are sent to ds cache, as well as save events. Remove old logbook.action. When saving a dataset for the first commit, the flag --use-dscache will opt-in to dscache by creating a new instance.
Pass bus to dscache constructor. Subscribe as a private method inside the constructor. Add an unsubscribe method to the bus, call it when the subscriber is finished.
037b31a
to
90e3b4c
Compare
09324da
to
8231792
Compare
Add subscribe method to filesystem watcher. Add locks to dscache. Sync tests. Sync.wait has to count number of subscribers. Wait returns any errors that occur, delivery via Acknowledge method. Remove syncs when they are finished with Wait.
8231792
to
ff767fb
Compare
Discussed offline, we're rethinking this approach. Using an event.Bus to update dscache doesn't seem necessary, and leads to lots of bad things like non-deterministic behavior and race conditions. Instead, we're just going to use a simple callback function from logbook. Leaving this PR for now for posterity, but going to switch over to this simpler approach, in #1140. |
Here's my notes from our chat: Ugh, just found a problem. The func TestSyncMultipleEventPublish(t *testing.T) {
ctx := context.Background()
counter := syncCounter{}
bus := NewBus(ctx)
ch1 := bus.Subscribe(ETMainTestEvent)
handleEvents(bus, ch1, nil, &counter)
s := bus.Synchronizer()
go bus.Publish(ETMainTestEvent, 1)
go bus.Publish(ETMainTestEvent, 1)
time.Sleep(100)
s.Wait()
if counter.Count != 1 {
t.Errorf("expected Count to be 1, got %d", counter.Count)
}
} We could solve this by allocating an identifier with each distinct "request", and attaching that to the syncronizer. In go it'd be best to use // NewRequestContext creates a request context from the event package with an
// embedded request ID value that can be used to distinguish event subscriptions
func NewRequestContext() context.Context {
return context.WithValue(EventID, newRequestID(), context.Background())
}
func TestSyncMultipleEventPublish(t *testing.T) {
// bus intentional uses a different context
bus := NewBus(context.Background())
counter := syncCounter{}
s := bus.Syncronizer(reqCtx)
handleEvents(bus, ch1, nil, &counter)
reqCtx := NewContext()
ch1 := bus.Subscribe(ETMainTestEvent)
// publish the event
bus.Publish(reqCtx, ETMainTestEvent, 1)
// bus.Publish(ETMainTestEvent, 1)
// wait will internally use the request identifier to distinguish contexts,
// counting only references that are published with the same request context
err := <- s.Wait(reqCtx)
if err != nil {
t.Fatal(err)
}
if counter.Count != 1 {
t.Errorf("expected Count to be 1, got %d", counter.Count)
}
} Relying on reference countingthe Syncronizer pattern is too niive about reference counting. The reference count may drop to zero before the process is truly finished: Here's an example of an evented
In the above example dscache needs to react to both remote events, but if any events are adknowledged before all events are published, the reference count will drop to zero prematurely, causing the We can have two patterns.I think the problem here we're thinking too either-or about using an event bus. We have patterns that already work, and some places where they really don't. We should only use patterns on the latter. For any given set of business logic, there are functions on the critical path. Critical path functions MUST complete (either with success or error) before a request is considered finished. For this stuff, our current pattern of just calling functions in sequence, and maybe using an We want the critical path to be declartive. Events are for side effects. It seems many of our packages have been designed with this critical path/side effect diachotomy in mind: critical path packages
side-effect packages
Some observations we can draw from this:
Other things we consideredI was thinking a // MaybeSubscribeOnce
promise := bus.MaybeSubscribeOnce(reqCtx, inst.dscache, ETDscacheRefAdded) But this doesn't solve the core problem of keeping core logic & side effects in separate paradigms. |
Ok, we’ve pulled the best bits out of this. You’ve served us well #1133, rest now. |
Logbook informs event.Bus of dataset inits and saves. Init events are sent to ds cache, as well as save events. Remove old logbook.action. When saving a dataset for the first commit, the flag --use-dscache will opt-in to dscache by creating a new instance.