Skip to content

Commit

Permalink
refactor(api): watchfs sends over event bus, websocket API is an even…
Browse files Browse the repository at this point in the history
…t stream

move all filesystem-specific logic from websocket into watchfs. Rework watchfs to
publish over the event bus.

This sets up a broader shift for the websocket connection, which is now jus a stream
of events coming off the bus.

BREAKING CHANGE:
websocket JSON messages have changed. They're now _always_ an object with "type" and
"data" keys.
  • Loading branch information
b5 committed Jul 8, 2020
1 parent bc071a0 commit 3cc9949
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 217 deletions.
165 changes: 66 additions & 99 deletions api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,131 +2,98 @@ package api

import (
"context"
"fmt"

"net/http"
"time"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/qri-io/qri/base/component"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/watchfs"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)

const (
qriWebsocketProtocol = "qri-websocket"
)

// TODO(dlong): This file has a tight coupling between Websocket and Watchfs that makes sense
// for now, as they're two pieces working together on the same task, but will start to make
// less sense once more Websocket messages are being delivered, and as the event.Bus is used
// more places. Reconsider in the future how to better integrate these two pieces.
const qriWebsocketProtocol = "qri-websocket"

// ServeWebsocket creates a websocket that clients can connect to in order to get realtime events
func (s Server) ServeWebsocket(ctx context.Context) {
c := s.Config().API
cfg := s.Config().API

// Watch the filesystem. Events will be sent to websocket connections.
node := s.Node()
fsmessages, err := s.startFilesysWatcher(ctx, node)
watcher, err := watchfs.NewFilesysWatcher(ctx, s.Instance.Bus())
if err != nil {
log.Infof("Watching filesystem error: %s", err)
log.Errorf("Watching filesystem error: %s", err)
return
}
s.Instance.Watcher = watcher
if err = s.Instance.Watcher.WatchAllFSIPaths(ctx, s.Repo()); err != nil {
log.Error(err)
}

go func() {
maAddress := c.WebsocketAddress
addr, err := ma.NewMultiaddr(maAddress)
if err != nil {
log.Errorf("cannot start Websocket: error parsing Websocket address %s: %w", maAddress, err.Error())
}

mal, err := manet.Listen(addr)
if err != nil {
log.Infof("Websocket listen on address %d error: %w", c.WebsocketAddress, err.Error())
return
}
l := manet.NetListener(mal)
defer l.Close()

// Collect all websocket connections. Should only be one at a time, but that may
// change in the future.
connections := []*websocket.Conn{}
srv := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
Subprotocols: []string{qriWebsocketProtocol},
InsecureSkipVerify: true,
})
if err != nil {
log.Debugf("Websocket accept error: %s", err)
return
}
connections = append(connections, c)
}),
ReadTimeout: time.Second * 15,
WriteTimeout: time.Second * 15,
}
defer srv.Close()
addr, err := ma.NewMultiaddr(cfg.WebsocketAddress)
if err != nil {
log.Errorf("cannot start Websocket: error parsing Websocket address %s: %w", cfg.WebsocketAddress, err.Error())
}

known := component.GetKnownFilenames()
mal, err := manet.Listen(addr)
if err != nil {
log.Infof("Websocket listen on address %d error: %w", cfg.WebsocketAddress, err.Error())
return
}
l := manet.NetListener(mal)
defer l.Close()

// Filesystem events are forwarded to the websocket. In the future, this may be
// expanded to handle other types of events, such as SaveDatasetProgressEvent,
// and DiffProgressEvent, but this is fine for now.
go func() {
for {
select {
case fse := <-fsmessages:
if s.filterEvent(fse, known) {
log.Debugf("filesys event: %s\n", fse)
for k, c := range connections {
err = wsjson.Write(ctx, c, fse)
if err != nil {
log.Errorf("connection %d: wsjson write error: %s", k, err)
}
}
}
}
// Collect all websocket connections
connections := []*websocket.Conn{}
srv := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
Subprotocols: []string{qriWebsocketProtocol},
InsecureSkipVerify: true,
})
if err != nil {
log.Debugf("Websocket accept error: %s", err)
return
}
}()

// TODO(dlong): Move to SummaryString
fmt.Printf("Listening for websocket connection at %s\n", l.Addr().String())
connections = append(connections, c)
}),
ReadTimeout: time.Second * 15,
WriteTimeout: time.Second * 15,
}
defer srv.Close()

// Start http server for websocket.
err = srv.Serve(l)
if err != http.ErrServerClosed {
log.Infof("failed to listen and serve: %v", err)
handler := func(_ context.Context, t event.Type, payload interface{}) error {
ctx := context.Background()
evt := map[string]interface{}{
"type": string(t),
"data": payload,
}
}()
}

func (s Server) startFilesysWatcher(ctx context.Context, node *p2p.QriNode) (chan watchfs.FilesysEvent, error) {
refs, err := node.Repo.References(0, 100)
if err != nil {
return nil, err
}
// Extract fsi paths for all working directories.
paths := make([]watchfs.EventPath, 0, len(refs))
for _, ref := range refs {
if ref.FSIPath != "" {
paths = append(paths, watchfs.EventPath{
Path: ref.FSIPath,
Username: ref.Peername,
Dsname: ref.Name,
})
log.Debugf("sending event %q to %d websocket conns", t, len(connections))
for k, c := range connections {
go func(k int, c *websocket.Conn) {
err := wsjson.Write(ctx, c, evt)
if err != nil {
log.Errorf("connection %d: wsjson write error: %s", k, err)
}
}(k, c)
}
return nil
}
// Watch those paths.
// TODO(dlong): When datasets are removed or renamed update the watchlist.
s.Instance.Watcher = watchfs.NewFilesysWatcher(ctx, s.Instance.Bus())
fsmessages := s.Instance.Watcher.Begin(paths)
return fsmessages, nil
}

func (s Server) filterEvent(event watchfs.FilesysEvent, knownFilenames map[string][]string) bool {
return component.IsKnownFilename(event.Source, knownFilenames)
s.Instance.Bus().Subscribe(handler,
event.ETFSICreateLinkEvent,
event.ETCreatedNewFile,
event.ETModifiedFile,
event.ETDeletedFile,
event.ETRenamedFolder,
event.ETRemovedFolder,
)

// Start http server for websocket.
err = srv.Serve(l)
if err != http.ErrServerClosed {
log.Infof("failed to listen and serve: %v", err)
}
}
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func (cfg Config) SummaryString() (summary string) {

if cfg.API != nil && cfg.API.Enabled {
summary += fmt.Sprintf("API address:\t%s\n", cfg.API.Address)
if cfg.API.WebsocketAddress != "" {
fmt.Printf("Listening for websocket connection at %s\n", cfg.API.WebsocketAddress)
}
}

if cfg.RPC != nil && cfg.RPC.Enabled {
Expand Down
40 changes: 40 additions & 0 deletions event/filesystem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package event

import (
"time"
)

const (
// ETFSICreateLinkEvent type for when FSI creates a link between a dataset
// and working directory
ETFSICreateLinkEvent = Type("fsi:CreateLinkEvent")
)

// FSICreateLinkEvent describes an FSI created link
type FSICreateLinkEvent struct {
FSIPath string
Username string
Dsname string
}

const (
// ETCreatedNewFile is the event for creating a new file
ETCreatedNewFile = Type("watchfs:CreatedNewFile")
// ETModifiedFile is the event for modifying a file
ETModifiedFile = Type("watchfs:ModifiedFile")
// ETDeletedFile is the event for deleting a file
ETDeletedFile = Type("watchfs:DeletedFile")
// ETRenamedFolder is the event for renaming a folder
ETRenamedFolder = Type("watchfs:RenamedFolder")
// ETRemovedFolder is the event for removing a folder
ETRemovedFolder = Type("watchfs:RemovedFolder")
)

// WatchfsChange represents events for filesystem changes
type WatchfsChange struct {
Username string
Dsname string
Source string
Destination string
Time time.Time
}
14 changes: 0 additions & 14 deletions event/fsi.go

This file was deleted.

31 changes: 0 additions & 31 deletions watchfs/event.go

This file was deleted.

Loading

0 comments on commit 3cc9949

Please sign in to comment.