Skip to content

Commit

Permalink
refactor(event): wiring watchfs up to FSI events
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Feb 27, 2020
1 parent bec4838 commit 65ecc5e
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 39 deletions.
6 changes: 3 additions & 3 deletions api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
func (s Server) ServeWebsocket(ctx context.Context) {
// Watch the filesystem. Events will be sent to websocket connections.
node := s.Node()
fsmessages, err := s.startFilesysWatcher(node)
fsmessages, err := s.startFilesysWatcher(ctx, node)
if err != nil {
log.Infof("Watching filesystem error: %s", err)
return
Expand Down Expand Up @@ -113,7 +113,7 @@ func (s Server) ServeWebsocket(ctx context.Context) {
}()
}

func (s Server) startFilesysWatcher(node *p2p.QriNode) (chan watchfs.FilesysEvent, error) {
func (s Server) startFilesysWatcher(ctx context.Context, node *p2p.QriNode) (chan watchfs.FilesysEvent, error) {
refs, err := node.Repo.References(0, 100)
if err != nil {
return nil, err
Expand All @@ -131,7 +131,7 @@ func (s Server) startFilesysWatcher(node *p2p.QriNode) (chan watchfs.FilesysEven
}
// Watch those paths.
// TODO(dlong): When datasets are removed or renamed update the watchlist.
s.Instance.Watcher = watchfs.NewFilesysWatcher()
s.Instance.Watcher = watchfs.NewFilesysWatcher(ctx, s.Instance.Bus())
fsmessages := s.Instance.Watcher.Begin(paths)
return fsmessages, nil
}
Expand Down
9 changes: 5 additions & 4 deletions dscache/dscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,13 @@ func (d *Dscache) ListRefs() ([]reporef.DatasetRef, error) {
}

func (d *Dscache) update(act *logbook.Action) {
if act.Type == logbook.ActionDatasetNameInit {
if err := d.updateInitDataset(act); err != nil {
switch act.Type {
case logbook.ActionDatasetNameInit:
if err := d.updateInitDataset(act); err != nil && err != ErrNoDscache {
log.Error(err)
}
} else if act.Type == logbook.ActionDatasetChange {
if err := d.updateMoveCursor(act); err != nil {
case logbook.ActionDatasetChange:
if err := d.updateMoveCursor(act); err != nil && err != ErrNoDscache {
log.Error(err)
}
}
Expand Down
41 changes: 41 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ type Event struct {
Payload interface{}
}

// Publisher is an interface that can only publish an event
type Publisher interface {
Publish(t Topic, data interface{})
}

// NilPublisher replaces a nil value, does nothing
type NilPublisher struct {
}

// Publish does nothing with the event
func (n *NilPublisher) Publish(t Topic, data interface{}) {
}

// Bus is a central coordination point for event publication and subscription
// zero or more subscribers register topics to be notified of, a publisher
// writes a topic event to the bus, which broadcasts to all subscribers of that
Expand All @@ -32,11 +45,15 @@ type Bus interface {
Publish(t Topic, data interface{})
// Subscribe to one or more topics
Subscribe(topics ...Topic) <-chan Event
// Unsubscribe cleans up a channel that no longer need to receive events
Unsubscribe(<-chan Event)
// SubscribeOnce to one or more topics. the returned channel will only fire
// once, when the first event that matches any of the given topics
// the common use case for multiple subscriptions is subscribing to both
// success and error events
SubscribeOnce(types ...Topic) <-chan Event
// NumSubscriptions returns the number of subscribers to the bus's events
NumSubscribers() int
}

type dataChannels []chan Event
Expand Down Expand Up @@ -134,6 +151,30 @@ func (b *bus) Subscribe(topics ...Topic) <-chan Event {
return ch
}

// Unsubscribe cleans up a channel that no longer need to receive events
func (b *bus) Unsubscribe(unsub <-chan Event) {
for topic, channels := range b.subs {
var replace dataChannels
for i, ch := range channels {
if ch == unsub {
replace = append(channels[:i], channels[i+1:]...)
}
}
if replace != nil {
b.subs[topic] = replace
}
}
}

// NumSubscribers returns the number of subscribers to the bus's events
func (b *bus) NumSubscribers() int {
total := 0
for _, channels := range b.subs {
total += len(channels)
}
return total
}

// SubscribeOnce will only get one event of the topic, then close itself
func (b *bus) SubscribeOnce(topics ...Topic) <-chan Event {
b.onceLk.Lock()
Expand Down
26 changes: 26 additions & 0 deletions event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package event
import (
"context"
"fmt"
"testing"
)

func Example() {
Expand Down Expand Up @@ -51,3 +52,28 @@ func Example() {
// hello
// it didn't work?
}

func TestSubscribeUnsubscribe(t *testing.T) {
ctx := context.Background()
const testTopic = Topic("test_event")

b := NewBus(ctx)
ch1 := b.Subscribe(testTopic)
ch2 := b.Subscribe(testTopic)

if b.NumSubscribers() != 2 {
t.Errorf("expected 2 subscribers, got %d", b.NumSubscribers())
}

b.Unsubscribe(ch1)

if b.NumSubscribers() != 1 {
t.Errorf("expected 1 subscribers, got %d", b.NumSubscribers())
}

b.Unsubscribe(ch2)

if b.NumSubscribers() != 0 {
t.Errorf("expected 1 subscribers, got %d", b.NumSubscribers())
}
}
16 changes: 14 additions & 2 deletions fsi/fsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
golog "github.com/ipfs/go-log"
"github.com/qri-io/qri/base"
"github.com/qri-io/qri/base/component"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/repo"
reporef "github.com/qri-io/qri/repo/ref"
)
Expand Down Expand Up @@ -60,11 +61,15 @@ func RepoPath(repoPath string) string {
type FSI struct {
// repository for resolving dataset names
repo repo.Repo
pub event.Publisher
}

// NewFSI creates an FSI instance from a path to a links flatbuffer file
func NewFSI(r repo.Repo) *FSI {
return &FSI{repo: r}
func NewFSI(r repo.Repo, pub event.Publisher) *FSI {
if pub == nil {
pub = &event.NilPublisher{}
}
return &FSI{repo: r, pub: pub}
}

// LinkedRefs returns a list of linked datasets and their connected directories
Expand Down Expand Up @@ -143,6 +148,13 @@ func (fsi *FSI) CreateLink(dirPath, refStr string) (alias string, rollback func(
removeRefFunc()
}

// Send an event to the bus about this checkout
fsi.pub.Publish(event.ETFSICreateLinkEvent, event.FSICreateLinkEvent{
FSIPath: dirPath,
Username: ref.Peername,
Dsname: ref.Name,
})

return ref.AliasString(), removeLinkAndRemoveRefFunc, err
}

Expand Down
14 changes: 7 additions & 7 deletions fsi/fsi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestCreateLink(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
link, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestCreateLinkTwice(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestCreateLinkAlreadyLinked(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand All @@ -163,7 +163,7 @@ func TestCreateLinkAgainOnceQriRefRemoved(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestModifyLinkReference(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestModifyLinkDirectory(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/another_dataset")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestUnlink(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down
8 changes: 4 additions & 4 deletions fsi/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestStatusValid(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestStatusInvalidDataset(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand All @@ -105,7 +105,7 @@ func TestStatusInvalidMeta(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand All @@ -127,7 +127,7 @@ func TestStatusNotFound(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down
9 changes: 0 additions & 9 deletions lib/fsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/qri-io/qri/base"
"github.com/qri-io/qri/base/component"
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/fsi"
"github.com/qri-io/qri/repo"
reporef "github.com/qri-io/qri/repo/ref"
Expand Down Expand Up @@ -190,14 +189,6 @@ func (m *FSIMethods) Checkout(p *CheckoutParams, out *string) (err error) {
}
log.Debugf("Checkout wrote components, successfully checked out dataset")

// Send an event to the bus about this checkout
m.inst.Bus().Publish(event.ETFSICreateLinkEvent, event.FSICreateLinkEvent{
FSIPath: p.Dir,
Username: ref.Peername,
Dsname: ref.Name,
})
log.Debugf("Checkout published an event")

log.Debugf("Checkout successfully checked out dataset")
return nil
}
Expand Down
9 changes: 3 additions & 6 deletions lib/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
streams: o.Streams,
registry: o.regclient,
logbook: o.logbook,
bus: event.NewBus(ctx),
}
qri = inst

Expand Down Expand Up @@ -371,10 +372,6 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
}
}

if inst.bus == nil {
inst.bus = newEventBus(ctx)
}

if inst.registry == nil {
inst.registry = newRegClient(ctx, cfg)
}
Expand All @@ -400,7 +397,7 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
// were somewhere else we could move it there
_ = base.SetFileHidden(inst.repoPath)

inst.fsi = fsi.NewFSI(inst.repo)
inst.fsi = fsi.NewFSI(inst.repo, inst.bus)
}

if inst.node == nil {
Expand Down Expand Up @@ -590,8 +587,8 @@ func NewInstanceFromConfigAndNode(cfg *config.Config, node *p2p.QriNode) *Instan
inst.repo = node.Repo
inst.store = node.Repo.Store()
inst.qfs = node.Repo.Filesystem()
inst.fsi = fsi.NewFSI(inst.repo)
inst.bus = event.NewBus(ctx)
inst.fsi = fsi.NewFSI(inst.repo, inst.bus)
}

return inst
Expand Down
Loading

0 comments on commit 65ecc5e

Please sign in to comment.