Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(event): wiring watchfs up to FSI events #1142

Merged
merged 1 commit into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
func (s Server) ServeWebsocket(ctx context.Context) {
// Watch the filesystem. Events will be sent to websocket connections.
node := s.Node()
fsmessages, err := s.startFilesysWatcher(node)
fsmessages, err := s.startFilesysWatcher(ctx, node)
if err != nil {
log.Infof("Watching filesystem error: %s", err)
return
Expand Down Expand Up @@ -113,7 +113,7 @@ func (s Server) ServeWebsocket(ctx context.Context) {
}()
}

func (s Server) startFilesysWatcher(node *p2p.QriNode) (chan watchfs.FilesysEvent, error) {
func (s Server) startFilesysWatcher(ctx context.Context, node *p2p.QriNode) (chan watchfs.FilesysEvent, error) {
refs, err := node.Repo.References(0, 100)
if err != nil {
return nil, err
Expand All @@ -131,7 +131,7 @@ func (s Server) startFilesysWatcher(node *p2p.QriNode) (chan watchfs.FilesysEven
}
// Watch those paths.
// TODO(dlong): When datasets are removed or renamed update the watchlist.
s.Instance.Watcher = watchfs.NewFilesysWatcher()
s.Instance.Watcher = watchfs.NewFilesysWatcher(ctx, s.Instance.Bus())
fsmessages := s.Instance.Watcher.Begin(paths)
return fsmessages, nil
}
Expand Down
9 changes: 5 additions & 4 deletions dscache/dscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,13 @@ func (d *Dscache) ListRefs() ([]reporef.DatasetRef, error) {
}

func (d *Dscache) update(act *logbook.Action) {
if act.Type == logbook.ActionDatasetNameInit {
if err := d.updateInitDataset(act); err != nil {
switch act.Type {
case logbook.ActionDatasetNameInit:
if err := d.updateInitDataset(act); err != nil && err != ErrNoDscache {
log.Error(err)
}
} else if act.Type == logbook.ActionDatasetChange {
if err := d.updateMoveCursor(act); err != nil {
case logbook.ActionDatasetChange:
if err := d.updateMoveCursor(act); err != nil && err != ErrNoDscache {
log.Error(err)
}
}
Expand Down
41 changes: 41 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ type Event struct {
Payload interface{}
}

// Publisher is an interface that can only publish an event
type Publisher interface {
Publish(t Topic, data interface{})
}

// NilPublisher replaces a nil value, does nothing
type NilPublisher struct {
}

// Publish does nothing with the event
func (n *NilPublisher) Publish(t Topic, data interface{}) {
}

// Bus is a central coordination point for event publication and subscription
// zero or more subscribers register topics to be notified of, a publisher
// writes a topic event to the bus, which broadcasts to all subscribers of that
Expand All @@ -32,11 +45,15 @@ type Bus interface {
Publish(t Topic, data interface{})
// Subscribe to one or more topics
Subscribe(topics ...Topic) <-chan Event
// Unsubscribe cleans up a channel that no longer need to receive events
Unsubscribe(<-chan Event)
// SubscribeOnce to one or more topics. the returned channel will only fire
// once, when the first event that matches any of the given topics
// the common use case for multiple subscriptions is subscribing to both
// success and error events
SubscribeOnce(types ...Topic) <-chan Event
// NumSubscriptions returns the number of subscribers to the bus's events
NumSubscribers() int
}

type dataChannels []chan Event
Expand Down Expand Up @@ -134,6 +151,30 @@ func (b *bus) Subscribe(topics ...Topic) <-chan Event {
return ch
}

// Unsubscribe cleans up a channel that no longer need to receive events
func (b *bus) Unsubscribe(unsub <-chan Event) {
for topic, channels := range b.subs {
var replace dataChannels
for i, ch := range channels {
if ch == unsub {
replace = append(channels[:i], channels[i+1:]...)
}
}
if replace != nil {
b.subs[topic] = replace
}
}
}

// NumSubscribers returns the number of subscribers to the bus's events
func (b *bus) NumSubscribers() int {
total := 0
for _, channels := range b.subs {
total += len(channels)
}
return total
}

// SubscribeOnce will only get one event of the topic, then close itself
func (b *bus) SubscribeOnce(topics ...Topic) <-chan Event {
b.onceLk.Lock()
Expand Down
26 changes: 26 additions & 0 deletions event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package event
import (
"context"
"fmt"
"testing"
)

func Example() {
Expand Down Expand Up @@ -51,3 +52,28 @@ func Example() {
// hello
// it didn't work?
}

func TestSubscribeUnsubscribe(t *testing.T) {
ctx := context.Background()
const testTopic = Topic("test_event")

b := NewBus(ctx)
ch1 := b.Subscribe(testTopic)
ch2 := b.Subscribe(testTopic)

if b.NumSubscribers() != 2 {
t.Errorf("expected 2 subscribers, got %d", b.NumSubscribers())
}

b.Unsubscribe(ch1)

if b.NumSubscribers() != 1 {
t.Errorf("expected 1 subscribers, got %d", b.NumSubscribers())
}

b.Unsubscribe(ch2)

if b.NumSubscribers() != 0 {
t.Errorf("expected 1 subscribers, got %d", b.NumSubscribers())
}
}
16 changes: 14 additions & 2 deletions fsi/fsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
golog "github.com/ipfs/go-log"
"github.com/qri-io/qri/base"
"github.com/qri-io/qri/base/component"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/repo"
reporef "github.com/qri-io/qri/repo/ref"
)
Expand Down Expand Up @@ -60,11 +61,15 @@ func RepoPath(repoPath string) string {
type FSI struct {
// repository for resolving dataset names
repo repo.Repo
pub event.Publisher
}

// NewFSI creates an FSI instance from a path to a links flatbuffer file
func NewFSI(r repo.Repo) *FSI {
return &FSI{repo: r}
func NewFSI(r repo.Repo, pub event.Publisher) *FSI {
if pub == nil {
pub = &event.NilPublisher{}
}
return &FSI{repo: r, pub: pub}
}

// LinkedRefs returns a list of linked datasets and their connected directories
Expand Down Expand Up @@ -143,6 +148,13 @@ func (fsi *FSI) CreateLink(dirPath, refStr string) (alias string, rollback func(
removeRefFunc()
}

// Send an event to the bus about this checkout
fsi.pub.Publish(event.ETFSICreateLinkEvent, event.FSICreateLinkEvent{
FSIPath: dirPath,
Username: ref.Peername,
Dsname: ref.Name,
})

return ref.AliasString(), removeLinkAndRemoveRefFunc, err
}

Expand Down
14 changes: 7 additions & 7 deletions fsi/fsi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestCreateLink(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
link, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestCreateLinkTwice(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestCreateLinkAlreadyLinked(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand All @@ -163,7 +163,7 @@ func TestCreateLinkAgainOnceQriRefRemoved(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestModifyLinkReference(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestModifyLinkDirectory(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/another_dataset")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestUnlink(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down
8 changes: 4 additions & 4 deletions fsi/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestStatusValid(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestStatusInvalidDataset(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand All @@ -105,7 +105,7 @@ func TestStatusInvalidMeta(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand All @@ -127,7 +127,7 @@ func TestStatusNotFound(t *testing.T) {
paths := NewTmpPaths()
defer paths.Close()

fsi := NewFSI(paths.testRepo)
fsi := NewFSI(paths.testRepo, nil)
_, _, err := fsi.CreateLink(paths.firstDir, "me/test_ds")
if err != nil {
t.Fatalf(err.Error())
Expand Down
9 changes: 0 additions & 9 deletions lib/fsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/qri-io/qri/base"
"github.com/qri-io/qri/base/component"
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/fsi"
"github.com/qri-io/qri/repo"
reporef "github.com/qri-io/qri/repo/ref"
Expand Down Expand Up @@ -190,14 +189,6 @@ func (m *FSIMethods) Checkout(p *CheckoutParams, out *string) (err error) {
}
log.Debugf("Checkout wrote components, successfully checked out dataset")

// Send an event to the bus about this checkout
m.inst.Bus().Publish(event.ETFSICreateLinkEvent, event.FSICreateLinkEvent{
FSIPath: p.Dir,
Username: ref.Peername,
Dsname: ref.Name,
})
log.Debugf("Checkout published an event")

log.Debugf("Checkout successfully checked out dataset")
return nil
}
Expand Down
9 changes: 3 additions & 6 deletions lib/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
streams: o.Streams,
registry: o.regclient,
logbook: o.logbook,
bus: event.NewBus(ctx),
}
qri = inst

Expand Down Expand Up @@ -371,10 +372,6 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
}
}

if inst.bus == nil {
inst.bus = newEventBus(ctx)
}

if inst.registry == nil {
inst.registry = newRegClient(ctx, cfg)
}
Expand All @@ -400,7 +397,7 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
// were somewhere else we could move it there
_ = base.SetFileHidden(inst.repoPath)

inst.fsi = fsi.NewFSI(inst.repo)
inst.fsi = fsi.NewFSI(inst.repo, inst.bus)
}

if inst.node == nil {
Expand Down Expand Up @@ -590,8 +587,8 @@ func NewInstanceFromConfigAndNode(cfg *config.Config, node *p2p.QriNode) *Instan
inst.repo = node.Repo
inst.store = node.Repo.Store()
inst.qfs = node.Repo.Filesystem()
inst.fsi = fsi.NewFSI(inst.repo)
inst.bus = event.NewBus(ctx)
inst.fsi = fsi.NewFSI(inst.repo, inst.bus)
}

return inst
Expand Down
Loading