Skip to content

Commit

Permalink
feat(lib): add OptEventHandler for subscribing to events move WS to i…
Browse files Browse the repository at this point in the history
…nstance

shift toward subscribing to events at the point of instance construction with
an option to provide an event handler & event type subscription list as a
NewInstance option.

While we're at it, let's internalize the websocket connection, which might
actually be able to replace the RPC api in the long run? either way it
lets a bunch of fields remain unexported.
  • Loading branch information
b5 committed Jul 8, 2020
1 parent 3cc9949 commit 889c175
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 26 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (cfg Config) SummaryString() (summary string) {
if cfg.API != nil && cfg.API.Enabled {
summary += fmt.Sprintf("API address:\t%s\n", cfg.API.Address)
if cfg.API.WebsocketAddress != "" {
fmt.Printf("Listening for websocket connection at %s\n", cfg.API.WebsocketAddress)
summary += fmt.Sprintf("Websocket address:\t%s\n", cfg.API.WebsocketAddress)
}
}

Expand Down
6 changes: 6 additions & 0 deletions event/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package event


// ETInstanceConstructed is fired once a node is created
// payload is nil
var ETInstanceConstructed = Type("lib:InstanceConstructed")
37 changes: 28 additions & 9 deletions lib/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type InstanceOptions struct {
remoteMockClient bool
// use OptRemoteOptions to set this
remoteOptsFunc func(*remote.Options)

eventHandler event.Handler
events []event.Type
}

// InstanceContextKey is used by context to set keys for constucting a lib.Instance
Expand Down Expand Up @@ -231,6 +234,20 @@ func OptLogbook(bk *logbook.Book) Option {
}
}

// OptEventHandler provides an event handler & list of event types to subscribe
// to. The canonical list of events a qri instance emits are defined in the
// github.com/qri-io/qri/event package
// plase note that event handlers in qri are synchronous. A handler function
// that takes a long time to return will slow down the performance of qri
// generally
func OptEventHandler(handler event.Handler, events ...event.Type) Option {
return func(o *InstanceOptions) error {
o.eventHandler = handler
o.events = events
return nil
}
}

// NewInstance creates a new Qri Instance, if no Option funcs are provided,
// New uses a default set of Option funcs. Any Option functions passed to this
// function must check whether their fields are nil or not.
Expand Down Expand Up @@ -343,6 +360,10 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
}
}

if o.eventHandler != nil && o.events != nil {
inst.bus.Subscribe(o.eventHandler, o.events...)
}

if inst.qfs == nil {
inst.qfs, err = buildrepo.NewFilesystem(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -440,6 +461,12 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
}

go inst.waitForAllDone()
go func() {
if err := inst.bus.Publish(ctx, event.ETInstanceConstructed, nil); err != nil {
log.Error(err)
}
}()

ok = true
return
}
Expand Down Expand Up @@ -581,7 +608,7 @@ type Instance struct {
logbook *logbook.Book
dscache *dscache.Dscache
bus event.Bus
Watcher *watchfs.FilesysWatcher
watcher *watchfs.FilesysWatcher

rpc *rpc.Client

Expand Down Expand Up @@ -640,14 +667,6 @@ func (inst *Instance) FSI() *fsi.FSI {
return inst.fsi
}

// Bus returns the event.Bus
func (inst *Instance) Bus() event.Bus {
if inst == nil {
return nil
}
return inst.bus
}

// ChangeConfig implements the ConfigSetter interface
func (inst *Instance) ChangeConfig(cfg *config.Config) (err error) {
cfg = cfg.WithPrivateValues(inst.cfg)
Expand Down
14 changes: 13 additions & 1 deletion lib/lib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -77,7 +78,16 @@ func TestNewInstance(t *testing.T) {
}
cfg.Repo.Type = "mem"

got, err := NewInstance(ctx, tr.QriPath, OptConfig(cfg))
var firedEventWg sync.WaitGroup
firedEventWg.Add(1)
handler := func(_ context.Context, t event.Type, _ interface{}) error {
if t == event.ETInstanceConstructed {
firedEventWg.Done()
}
return nil
}

got, err := NewInstance(ctx, tr.QriPath, OptConfig(cfg), OptEventHandler(handler, event.ETInstanceConstructed))
if err != nil {
t.Fatal(err)
}
Expand All @@ -90,6 +100,8 @@ func TestNewInstance(t *testing.T) {
t.Error(err)
}

firedEventWg.Wait()

finished := make(chan struct{})
go func() {
select {
Expand Down
43 changes: 28 additions & 15 deletions api/websocket.go → lib/websocket.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package api
package lib

import (
"context"
Expand All @@ -16,29 +16,31 @@ import (

const qriWebsocketProtocol = "qri-websocket"

// ServeWebsocket creates a websocket that clients can connect to in order to get realtime events
func (s Server) ServeWebsocket(ctx context.Context) {
cfg := s.Config().API
// ServeWebsocket creates a websocket that clients can connect to in order to
// get realtime events
func (inst *Instance) ServeWebsocket(ctx context.Context) {
apiCfg := inst.cfg.API

// Watch the filesystem. Events will be sent to websocket connections.
watcher, err := watchfs.NewFilesysWatcher(ctx, s.Instance.Bus())
// Watch the filesystem. Events will be sent to websocket connections
// TODO (b5) - watchfs constrcution shouldn't happen here
watcher, err := watchfs.NewFilesysWatcher(ctx, inst.bus)
if err != nil {
log.Errorf("Watching filesystem error: %s", err)
return
}
s.Instance.Watcher = watcher
if err = s.Instance.Watcher.WatchAllFSIPaths(ctx, s.Repo()); err != nil {
inst.watcher = watcher
if err = inst.watcher.WatchAllFSIPaths(ctx, inst.repo); err != nil {
log.Error(err)
}

addr, err := ma.NewMultiaddr(cfg.WebsocketAddress)
addr, err := ma.NewMultiaddr(apiCfg.WebsocketAddress)
if err != nil {
log.Errorf("cannot start Websocket: error parsing Websocket address %s: %w", cfg.WebsocketAddress, err.Error())
log.Errorf("cannot start Websocket: error parsing Websocket address %s: %w", apiCfg.WebsocketAddress, err.Error())
}

mal, err := manet.Listen(addr)
if err != nil {
log.Infof("Websocket listen on address %d error: %w", cfg.WebsocketAddress, err.Error())
log.Infof("Websocket listen on address %d error: %w", apiCfg.WebsocketAddress, err.Error())
return
}
l := manet.NetListener(mal)
Expand Down Expand Up @@ -82,7 +84,7 @@ func (s Server) ServeWebsocket(ctx context.Context) {
return nil
}

s.Instance.Bus().Subscribe(handler,
inst.bus.Subscribe(handler,
event.ETFSICreateLinkEvent,
event.ETCreatedNewFile,
event.ETModifiedFile,
Expand All @@ -92,8 +94,19 @@ func (s Server) ServeWebsocket(ctx context.Context) {
)

// Start http server for websocket.
err = srv.Serve(l)
if err != http.ErrServerClosed {
log.Infof("failed to listen and serve: %v", err)
go func() {
err = srv.Serve(l)
if err != http.ErrServerClosed {
log.Infof("failed to listen and serve: %v", err)
}
}()

<-ctx.Done()

shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

if err = srv.Shutdown(shutdownCtx); err != nil {
log.Error(err)
}
}
43 changes: 43 additions & 0 deletions lib/websocket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package lib

import (
"context"
"testing"

"github.com/qri-io/qfs"
"github.com/qri-io/qri/config"
repotest "github.com/qri-io/qri/repo/test"
)

func TestWebsocket(t *testing.T) {
tr, err := repotest.NewTempRepo("foo", "websocket_test", repotest.NewTestCrypto())
if err != nil {
t.Fatal(err)
}
defer tr.Delete()

instCtx, instCancel := context.WithCancel(context.Background())
defer instCancel()

cfg := config.DefaultConfigForTesting()
cfg.Filesystems = []qfs.Config{
{Type: "mem"},
{Type: "local"},
}
cfg.Repo.Type = "mem"

inst, err := NewInstance(instCtx, tr.QriPath, OptConfig(cfg))
if err != nil {
t.Fatal(err)
}

wsCtx, wsCancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
inst.ServeWebsocket(wsCtx)
done <- struct{}{}
}()

wsCancel()
<-done
}

0 comments on commit 889c175

Please sign in to comment.