Skip to content

Commit

Permalink
feat(event): When a folder is checked out, add it to watchfs
Browse files Browse the repository at this point in the history
  • Loading branch information
dustmop committed Feb 19, 2020
1 parent 6068f88 commit 7536221
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 18 deletions.
40 changes: 31 additions & 9 deletions api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/qri-io/qri/base/component"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/watchfs"
"nhooyr.io/websocket"
Expand All @@ -21,6 +22,11 @@ const (
qriWebsocketProtocol = "qri-websocket"
)

// TODO(dlong): This file has a tight coupling between Websocket and Watchfs that makes sense
// for now, as they're two pieces working together on the same task, but will start to make
// less sense once more Websocket messages are being delivered, and as the event.Bus is used
// more places. Reconsider in the future how to better integrate these two pieces.

// ServeWebsocket creates a websocket that clients can connect to in order to get realtime events
func (s Server) ServeWebsocket(ctx context.Context) {
// Watch the filesystem. Events will be sent to websocket connections.
Expand Down Expand Up @@ -59,20 +65,37 @@ func (s Server) ServeWebsocket(ctx context.Context) {
}
defer srv.Close()

// Subscribe to FSI link creation events, which will affect filesystem watching
// TODO(dlong): A good example of tight coupling causing an issue: The Websocket
// implementation doesn't need to know about these events, but the FilesystemWatcher
// does. Ideally, this Subscribe call would happen along with the latter, not the former.
busEvents := s.Instance.Bus().Subscribe(event.ETFSICreateLinkEvent)

known := component.GetKnownFilenames()

// Filesystem events are forwarded to the websocket. In the future, this may be
// expanded to handle other types of events, such as SaveDatasetProgressEvent,
// and DiffProgressEvent, but this is fine for now.
go func() {
for {
e := <-fsmessages
if s.filterEvent(e, known) {
log.Debugf("filesys event: %s\n", e)
for k, c := range connections {
err = wsjson.Write(ctx, c, e)
if err != nil {
log.Errorf("connection %d: wsjson write error: %s", k, err)
select {
case e := <-busEvents:
log.Debugf("bus event: %s\n", e)
if fce, ok := e.Payload.(event.FSICreateLinkEvent); ok {
s.Instance.Watcher.Add(watchfs.EventPath{
Path: fce.FSIPath,
Username: fce.Username,
Dsname: fce.Dsname,
})
}
case fse := <-fsmessages:
if s.filterEvent(fse, known) {
log.Debugf("filesys event: %s\n", fse)
for k, c := range connections {
err = wsjson.Write(ctx, c, fse)
if err != nil {
log.Errorf("connection %d: wsjson write error: %s", k, err)
}
}
}
}
Expand Down Expand Up @@ -107,8 +130,7 @@ func (s Server) startFilesysWatcher(node *p2p.QriNode) (chan watchfs.FilesysEven
}
}
// Watch those paths.
// TODO(dlong): When datasets are init'd, or checked out, or removed, or renamed, update
// the watchlist.
// TODO(dlong): When datasets are removed or renamed update the watchlist.
s.Instance.Watcher = watchfs.NewFilesysWatcher()
fsmessages := s.Instance.Watcher.Begin(paths)
return fsmessages, nil
Expand Down
11 changes: 9 additions & 2 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Bus interface {
// Publish an event to the bus
Publish(t Topic, data interface{})
// Subscribe to one or more topics
Subscribe(ch chan Event, topics ...Topic)
Subscribe(topics ...Topic) <-chan Event
// SubscribeOnce to one or more topics. the returned channel will only fire
// once, when the first event that matches any of the given topics
// the common use case for multiple subscriptions is subscribing to both
Expand Down Expand Up @@ -78,6 +78,7 @@ func NewBus(ctx context.Context) Bus {
return b
}

// Publish sends an event to the bus
func (b *bus) Publish(topic Topic, data interface{}) {
b.lk.RLock()
defer b.lk.RUnlock()
Expand Down Expand Up @@ -114,20 +115,26 @@ func (b *bus) Publish(topic Topic, data interface{}) {
}(event)
}

func (b *bus) Subscribe(ch chan Event, topics ...Topic) {
// Subscribe requests events from the given topic, returning a channel of those events
func (b *bus) Subscribe(topics ...Topic) <-chan Event {
b.lk.Lock()
defer b.lk.Unlock()
log.Debugf("Subscribe: %v", topics)

ch := make(chan Event)

for _, topic := range topics {
if prev, ok := b.subs[topic]; ok {
b.subs[topic] = append(prev, ch)
} else {
b.subs[topic] = dataChannels{ch}
}
}

return ch
}

// SubscribeOnce will only get one event of the topic, then close itself
func (b *bus) SubscribeOnce(topics ...Topic) <-chan Event {
b.onceLk.Lock()
defer b.onceLk.Unlock()
Expand Down
10 changes: 3 additions & 7 deletions event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@ func Example() {

ctx, done := context.WithCancel(context.Background())
bus := NewBus(ctx)
ch1 := make(chan Event)
ch2 := make(chan Event)
ch3 := make(chan Event)

bus.Subscribe(ch1, ETMainSaidHello)
bus.Subscribe(ch2, ETMainSaidHello)
bus.Subscribe(ch3, ETMainSaidHello)
ch1 := bus.Subscribe(ETMainSaidHello)
ch2 := bus.Subscribe(ETMainSaidHello)
ch3 := bus.Subscribe(ETMainSaidHello)

go bus.Publish(ETMainSaidHello, "hello")

Expand Down
17 changes: 17 additions & 0 deletions event/fsi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package event

import (
)

var (
// Event type when FSI creating a link between a dataset and working directory
ETFSICreateLinkEvent = Topic("fsi::createLinkEvent")
)

// FSICreateLinkEvent describes an FSI created link
type FSICreateLinkEvent struct {
FSIPath string
Username string
Dsname string
}

11 changes: 11 additions & 0 deletions lib/fsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/qri-io/qri/base"
"github.com/qri-io/qri/base/component"
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/fsi"
"github.com/qri-io/qri/repo"
reporef "github.com/qri-io/qri/repo/ref"
Expand Down Expand Up @@ -188,6 +189,16 @@ func (m *FSIMethods) Checkout(p *CheckoutParams, out *string) (err error) {
log.Debugf("Checkout, fsi.WriteComponents failed, error: %s", ref)
}
log.Debugf("Checkout wrote components, successfully checked out dataset")

// Send an event to the bus about this checkout
m.inst.Bus().Publish(event.ETFSICheckoutEvent, event.FSICheckoutEvent{
FSIPath: p.Dir,
Username: ref.Peername,
Dsname: ref.Name,
})
log.Debugf("Checkout published an event")

log.Debugf("Checkout successfully checked out dataset")
return nil
}

Expand Down
15 changes: 15 additions & 0 deletions lib/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/qri-io/qri/config"
"github.com/qri-io/qri/config/migrate"
"github.com/qri-io/qri/dscache"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/fsi"
"github.com/qri-io/qri/logbook"
"github.com/qri-io/qri/p2p"
Expand Down Expand Up @@ -370,6 +371,10 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
}
}

if inst.bus == nil {
inst.bus = newEventBus(ctx)
}

if inst.registry == nil {
inst.registry = newRegClient(ctx, cfg)
}
Expand Down Expand Up @@ -480,6 +485,10 @@ func newDscache(ctx context.Context, fs qfs.Filesystem, cfg *config.Config, repo
return dscache, nil
}

func newEventBus(ctx context.Context) event.Bus {
return event.NewBus(ctx)
}

func newRepo(path string, cfg *config.Config, store cafs.Filestore, fs qfs.Filesystem, book *logbook.Book, cache *dscache.Dscache) (r repo.Repo, err error) {
var pro *profile.Profile
if pro, err = profile.NewProfile(cfg.Profile); err != nil {
Expand Down Expand Up @@ -614,6 +623,7 @@ type Instance struct {
stats *stats.Stats
logbook *logbook.Book
dscache *dscache.Dscache
bus event.Bus

Watcher *watchfs.FilesysWatcher

Expand Down Expand Up @@ -655,6 +665,11 @@ func (inst *Instance) FSI() *fsi.FSI {
return inst.fsi
}

// Bus returns the event.Bus
func (inst *Instance) Bus() event.Bus {
return inst.bus
}

// ChangeConfig implements the ConfigSetter interface
func (inst *Instance) ChangeConfig(cfg *config.Config) (err error) {
cfg = cfg.WithPrivateValues(inst.cfg)
Expand Down
7 changes: 7 additions & 0 deletions watchfs/watchfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,15 @@ func (w *FilesysWatcher) Begin(paths []EventPath) chan FilesysEvent {
return messages
}

// Add starts watching an additional path
func (w *FilesysWatcher) Add(path EventPath) {
w.Assoc[path.Path] = path
w.Watcher.Add(path.Path)
}

// sendEvent sends a message on the channel about an event
func (w *FilesysWatcher) sendEvent(etype EventType, sour, dest string) {
log.Debugf("filesystem event %q %s -> %s\n", etype, sour, dest)
dir := filepath.Dir(sour)
ep := w.Assoc[dir]
event := FilesysEvent{
Expand Down

0 comments on commit 7536221

Please sign in to comment.