Skip to content

Commit

Permalink
refactor(event): re-work events to be synchronus
Browse files Browse the repository at this point in the history
Merge pull request #1430 from qri-io/feat_sync_events
  • Loading branch information
b5 authored Jul 7, 2020
2 parents 8dfadae + c296f53 commit bc071a0
Show file tree
Hide file tree
Showing 36 changed files with 357 additions and 422 deletions.
10 changes: 6 additions & 4 deletions api/test_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
)

type APITestRunner struct {
cancelCtx context.CancelFunc
Node *p2p.QriNode
NodeTeardown func()
Inst *lib.Instance
Expand All @@ -27,11 +28,11 @@ type APITestRunner struct {
}

func NewAPITestRunner(t *testing.T) *APITestRunner {
run := APITestRunner{}
run.Node, run.NodeTeardown = newTestNode(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
run := APITestRunner{
cancelCtx: cancel,
}
run.Node, run.NodeTeardown = newTestNode(t)

run.Inst = newTestInstanceWithProfileFromNode(ctx, run.Node)

Expand All @@ -57,6 +58,7 @@ func NewAPITestRunner(t *testing.T) *APITestRunner {
func (r *APITestRunner) Delete() {
os.RemoveAll(r.TmpDir)
APIVersion = r.PrevXformVer
r.cancelCtx()
r.NodeTeardown()
}

Expand Down
16 changes: 0 additions & 16 deletions api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/qri-io/qri/base/component"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/watchfs"
"nhooyr.io/websocket"
Expand Down Expand Up @@ -72,12 +71,6 @@ func (s Server) ServeWebsocket(ctx context.Context) {
}
defer srv.Close()

// Subscribe to FSI link creation events, which will affect filesystem watching
// TODO(dlong): A good example of tight coupling causing an issue: The Websocket
// implementation doesn't need to know about these events, but the FilesystemWatcher
// does. Ideally, this Subscribe call would happen along with the latter, not the former.
busEvents := s.Instance.Bus().Subscribe(event.ETFSICreateLinkEvent)

known := component.GetKnownFilenames()

// Filesystem events are forwarded to the websocket. In the future, this may be
Expand All @@ -86,15 +79,6 @@ func (s Server) ServeWebsocket(ctx context.Context) {
go func() {
for {
select {
case e := <-busEvents:
log.Debugf("bus event: %s\n", e)
if fce, ok := e.Payload.(event.FSICreateLinkEvent); ok {
s.Instance.Watcher.Add(watchfs.EventPath{
Path: fce.FSIPath,
Username: fce.Username,
Dsname: fce.Dsname,
})
}
case fse := <-fsmessages:
if s.filterEvent(fse, known) {
log.Debugf("filesys event: %s\n", fse)
Expand Down
3 changes: 2 additions & 1 deletion base/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/qri-io/ioes"
"github.com/qri-io/qfs"
"github.com/qri-io/qfs/muxfs"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/profile"
reporef "github.com/qri-io/qri/repo/ref"
Expand Down Expand Up @@ -56,7 +57,7 @@ func newTestRepo(t *testing.T) repo.Repo {
t.Fatal(err)
}

mr, err := repo.NewMemRepo(ctx, testPeerProfile, mux)
mr, err := repo.NewMemRepo(ctx, testPeerProfile, mux, event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion base/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/qri-io/qfs/localfs"
testPeers "github.com/qri-io/qri/config/test"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/logbook"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/profile"
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestConstructDatasetLogFromHistory(t *testing.T) {
if err != nil {
t.Fatal(err)
}
book, err := logbook.NewJournal(p.PrivKey, p.Peername, mr.Filesystem(), "/map/logbook.qfb")
book, err := logbook.NewJournal(p.PrivKey, p.Peername, event.NilBus, mr.Filesystem(), "/map/logbook.qfb")
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion base/save_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/qri-io/dataset"
"github.com/qri-io/qfs"
"github.com/qri-io/qfs/muxfs"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/repo"
)

Expand Down Expand Up @@ -104,7 +105,7 @@ func TestCreateDataset(t *testing.T) {
if err != nil {
t.Fatal(err)
}
r, err := repo.NewMemRepo(ctx, testPeerProfile, fs)
r, err := repo.NewMemRepo(ctx, testPeerProfile, fs, event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/save_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/dscache"
qrierr "github.com/qri-io/qri/errors"
"github.com/qri-io/qri/event"
)

func TestSaveComplete(t *testing.T) {
Expand Down Expand Up @@ -563,8 +564,7 @@ func TestSaveDscacheFirstCommit(t *testing.T) {
}

cacheFilename := cache.Filename
// TODO(dustmop): Do we need to pass a book?
cache = dscache.NewDscache(ctx, fs, nil, run.Username(), cacheFilename)
cache = dscache.NewDscache(ctx, fs, event.NilBus, run.Username(), cacheFilename)

// Dscache should have two entries now. They are alphabetized by pretty name, and have all
// the expected data.
Expand Down Expand Up @@ -651,7 +651,7 @@ func TestSaveDscacheExistingDataset(t *testing.T) {
}

cacheFilename := cache.Filename
cache = dscache.NewDscache(ctx, fs, nil, run.Username(), cacheFilename)
cache = dscache.NewDscache(ctx, fs, event.NilBus, run.Username(), cacheFilename)

// Dscache should now have one reference. Now topIndex is 2 because there is another "commit".
actual = cache.VerboseString(false)
Expand Down Expand Up @@ -740,7 +740,7 @@ func TestSaveDscacheThenRemoveAll(t *testing.T) {
return
}
cacheFilename := cache.Filename
cache = dscache.NewDscache(ctx, fs, nil, run.Username(), cacheFilename)
cache = dscache.NewDscache(ctx, fs, event.NilBus, run.Username(), cacheFilename)

// Dscache should now have one reference.
actual = cache.VerboseString(false)
Expand Down Expand Up @@ -822,7 +822,7 @@ func TestSaveDscacheThenRemoveVersions(t *testing.T) {
t.Errorf("error creating local filesystem: %s", err)
}
cacheFilename := cache.Filename
cache = dscache.NewDscache(ctx, fs, nil, run.Username(), cacheFilename)
cache = dscache.NewDscache(ctx, fs, event.NilBus, run.Username(), cacheFilename)

// Dscache should now have one reference.
actual = cache.VerboseString(false)
Expand Down
44 changes: 28 additions & 16 deletions dscache/dscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/qri-io/qfs"
"github.com/qri-io/qri/dscache/dscachefb"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/event/hook"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/repo/profile"
reporef "github.com/qri-io/qri/repo/ref"
)
Expand All @@ -40,7 +40,7 @@ type Dscache struct {

// NewDscache will construct a dscache from the given filename, or will construct an empty dscache
// that will save to the given filename. Using an empty filename will disable loading and saving
func NewDscache(ctx context.Context, fsys qfs.Filesystem, hooks []hook.ChangeNotifier, username, filename string) *Dscache {
func NewDscache(ctx context.Context, fsys qfs.Filesystem, bus event.Bus, username, filename string) *Dscache {
cache := Dscache{Filename: filename}
f, err := fsys.Get(ctx, filename)
if err == nil {
Expand All @@ -55,9 +55,13 @@ func NewDscache(ctx context.Context, fsys qfs.Filesystem, hooks []hook.ChangeNot
}
}
cache.DefaultUsername = username
for _, h := range hooks {
h.SetChangeHook(cache.update)
}
bus.Subscribe(cache.handler,
event.ETDatasetNameInit,
event.ETDatasetCommitChange,
event.ETDatasetDeleteAll,
event.ETDatasetRename,
event.ETDatasetCreateLink)

return &cache
}

Expand Down Expand Up @@ -232,30 +236,38 @@ func (d *Dscache) validateProfileID(profileID string) bool {
return len(profileID) == lengthOfProfileID
}

func (d *Dscache) update(act hook.DsChange) {
switch act.Type {
case hook.DatasetNameInit:
func (d *Dscache) handler(_ context.Context, t event.Type, payload interface{}) error {
act, ok := payload.(event.DsChange)
if !ok {
log.Error("dscache got an event with a payload that isn't a event.DsChange type: %v", payload)
return nil
}

switch t {
case event.ETDatasetNameInit:
if err := d.updateInitDataset(act); err != nil && err != ErrNoDscache {
log.Error(err)
}
case hook.DatasetCommitChange:
case event.ETDatasetCommitChange:
if err := d.updateChangeCursor(act); err != nil && err != ErrNoDscache {
log.Error(err)
}
case hook.DatasetDeleteAll:
case event.ETDatasetDeleteAll:
if err := d.updateDeleteDataset(act); err != nil && err != ErrNoDscache {
log.Error(err)
}
case hook.DatasetRename:
case event.ETDatasetRename:
// TODO(dustmop): Handle renames
case hook.DatasetCreateLink:
case event.ETDatasetCreateLink:
if err := d.updateCreateLink(act); err != nil && err != ErrNoDscache {
log.Error(err)
}
}

return nil
}

func (d *Dscache) updateInitDataset(act hook.DsChange) error {
func (d *Dscache) updateInitDataset(act event.DsChange) error {
if d.IsEmpty() {
// Only create a new dscache if that feature is enabled. This way no one is forced to
// use dscache without opting in.
Expand Down Expand Up @@ -303,7 +315,7 @@ func (d *Dscache) updateInitDataset(act hook.DsChange) error {
}

// Copy the entire dscache, except for the matching entry, rebuild that one to modify it
func (d *Dscache) updateChangeCursor(act hook.DsChange) error {
func (d *Dscache) updateChangeCursor(act event.DsChange) error {
if d.IsEmpty() {
return ErrNoDscache
}
Expand Down Expand Up @@ -347,7 +359,7 @@ func (d *Dscache) updateChangeCursor(act hook.DsChange) error {
}

// Copy the entire dscache, except leave out the matching entry.
func (d *Dscache) updateDeleteDataset(act hook.DsChange) error {
func (d *Dscache) updateDeleteDataset(act event.DsChange) error {
if d.IsEmpty() {
return ErrNoDscache
}
Expand All @@ -370,7 +382,7 @@ func (d *Dscache) updateDeleteDataset(act hook.DsChange) error {
}

// Copy the entire dscache, except for the matching entry, which is copied then assigned an fsiPath
func (d *Dscache) updateCreateLink(act hook.DsChange) error {
func (d *Dscache) updateCreateLink(act event.DsChange) error {
if d.IsEmpty() {
return ErrNoDscache
}
Expand Down
11 changes: 6 additions & 5 deletions dscache/dscache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
testPeers "github.com/qri-io/qri/config/test"
"github.com/qri-io/qri/dsref"
dsrefspec "github.com/qri-io/qri/dsref/spec"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/identity"
"github.com/qri-io/qri/logbook"
"github.com/qri-io/qri/logbook/oplog"
Expand Down Expand Up @@ -67,11 +68,11 @@ func TestDscacheAssignSaveAndLoad(t *testing.T) {

// A dscache that will save when it is assigned
dscacheFile := filepath.Join(tmpdir, "dscache.qfb")
saveable := NewDscache(ctx, fs, nil, peername, dscacheFile)
saveable := NewDscache(ctx, fs, event.NilBus, peername, dscacheFile)
saveable.Assign(constructed)

// Load the dscache from its serialized file, verify it has correct data
loadable := NewDscache(ctx, fs, nil, peername, dscacheFile)
loadable := NewDscache(ctx, fs, event.NilBus, peername, dscacheFile)
if loadable.Root.UsersLength() != 1 {
t.Errorf("expected, 1 user, got %d users", loadable.Root.UsersLength())
}
Expand All @@ -93,7 +94,7 @@ func TestResolveRef(t *testing.T) {
t.Errorf("error creating local filesystem: %s", err)
}
path := filepath.Join(tmpdir, "dscache.qfb")
dsc := NewDscache(ctx, fs, nil, "test_resolve_ref_user", path)
dsc := NewDscache(ctx, fs, event.NilBus, "test_resolve_ref_user", path)

dsrefspec.AssertResolverSpec(t, dsc, func(r dsref.Ref, _ identity.Author, _ *oplog.Log) error {
builder := NewBuilder()
Expand All @@ -119,11 +120,11 @@ func TestCacheRefConsistency(t *testing.T) {

localUsername := "local_user"
localDsName := "local_dataset"
book, err := logbook.NewJournal(testPeers.GetTestPeerInfo(0).PrivKey, localUsername, fsys, "/mem/logbook.qfb")
book, err := logbook.NewJournal(testPeers.GetTestPeerInfo(0).PrivKey, localUsername, event.NilBus, fsys, "/mem/logbook.qfb")
if err != nil {
t.Fatal(err)
}
dsc := NewDscache(ctx, fsys, nil, "", "dscache.qfb")
dsc := NewDscache(ctx, fsys, event.NilBus, "", "dscache.qfb")

_, _, err = dsrefspec.GenerateExampleOplog(ctx, book, localDsName, "/ipfs/QmLocalExample")
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion dsref/spec/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/qri-io/qfs"
testPeers "github.com/qri-io/qri/config/test"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/identity"
"github.com/qri-io/qri/logbook"
"github.com/qri-io/qri/logbook/oplog"
Expand Down Expand Up @@ -177,7 +178,7 @@ func ConsistentResolvers(t *testing.T, ref dsref.Ref, resolvers ...dsref.Resolve
func ForeignLogbook(t *testing.T, username string) *logbook.Book {
pk := testPeers.GetTestPeerInfo(9).PrivKey
ms := qfs.NewMemFS()
journal, err := logbook.NewJournal(pk, username, ms, "/mem/logbook.qfb")
journal, err := logbook.NewJournal(pk, username, event.NilBus, ms, "/mem/logbook.qfb")
if err != nil {
t.Fatal(err)
}
Expand Down
35 changes: 35 additions & 0 deletions event/dataset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package event

import (
"github.com/qri-io/qri/dsref"
)

const (
// ETDatasetNameInit is when a dataset is initialized
// payload is a DsChange
ETDatasetNameInit = Type("dataset:Init")
// ETDatasetCommitChange is when a dataset changes its newest commit
// payload is a DsChange
ETDatasetCommitChange = Type("dataset:CommitChange")
// ETDatasetDeleteAll is when a dataset is entirely deleted
// payload is a DsChange
ETDatasetDeleteAll = Type("dataset:DeleteAll")
// ETDatasetRename is when a dataset is renamed
// payload is a DsChange
ETDatasetRename = Type("dataset:Rename")
// ETDatasetCreateLink is when a dataset is linked to a working directory
// payload is a DsChange
ETDatasetCreateLink = Type("dataset:CreateLink")
)

// DsChange represents the result of a change to a dataset
type DsChange struct {
InitID string
TopIndex int
ProfileID string
Username string
PrettyName string
HeadRef string
Info *dsref.VersionInfo
Dir string
}
Loading

0 comments on commit bc071a0

Please sign in to comment.