Skip to content

Commit

Permalink
feat(event): add p2p events to events and event bus to QriNode (#…
Browse files Browse the repository at this point in the history
…1440)

* feat(event): add p2p events to `events` and event bus to `QriNode`

Also updates all tests to new `NewQriNode` api that requires an event bus. Most tests now use `event.NilBus`

* test(p2p events): ensure events published by `QriNode` are firing
Commit also includes notes of which events are not currently ever
implemented to fire. These events will be added in a follow up pr.
  • Loading branch information
ramfox authored Jul 10, 2020
1 parent c802b8d commit c444288
Show file tree
Hide file tree
Showing 23 changed files with 117 additions and 45 deletions.
5 changes: 3 additions & 2 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
manet "github.com/multiformats/go-multiaddr-net"
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/config"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/lib"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/repo"
Expand Down Expand Up @@ -80,7 +81,7 @@ func newTestInstanceWithProfileFromNode(ctx context.Context, node *p2p.QriNode)
func newTestNodeWithNumDatasets(t *testing.T, _ int) (node *p2p.QriNode, teardown func()) {
var r repo.Repo
r, teardown = newTestRepo(t)
node, err := p2p.NewQriNode(r, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(r, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -203,7 +204,7 @@ func TestServerReadOnlyRoutes(t *testing.T) {
cfg.API.ReadOnly = false
}()

node, err := p2p.NewQriNode(r, cfg.P2P)
node, err := p2p.NewQriNode(r, cfg.P2P, event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
35 changes: 35 additions & 0 deletions event/p2p.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package event

var (
// ETP2PGoneOnline occurs when a p2p node opens up for peer-2-peer connections
// payload will be []multiaddr.Addr, the listening addresses of this peer
ETP2PGoneOnline = Type("p2p:GoneOnline")
// ETP2PGoneOffline occurs when a p2p node has finished disconnecting from
// a peer-2-peer network
// payload will be nil
// NOTE: not currently firing
ETP2PGoneOffline = Type("p2p:GoneOffline")
// ETP2PQriPeerConnected occurs after a qri peer has connected to this node
// payload will be a fully hydrated *profile.Profile from
// "github.com/qri-io/qri/repo/profile"
// NOTE: not currently firing
ETP2PQriPeerConnected = Type("p2p:QriPeerConnected")
// ETP2PQriPeerDisconnected occurs after a qri peer has connected to this node
// payload will be a fully hydrated *profile.Profile from
// "github.com/qri-io/qri/repo/profile"
// NOTE: not currently firing
ETP2PQriPeerDisconnected = Type("p2p:QriPeerDisconnected")
// ETP2PPeerConnected occurs after any peer has connected to this node
// payload will be a libp2p.peerInfo
// NOTE: not currently firing
ETP2PPeerConnected = Type("p2p:PeerConnected")
// ETP2PPeerDisconnected occurs after any peer has connected to this node
// payload will be a libp2p.peerInfo
// NOTE: not currently firing
ETP2PPeerDisconnected = Type("p2p:PeerDisconnected")
// ETP2PMessageReceived fires whenever the p2p protocol receives a message
// from a Qri peer
// payload will be a p2p.Message
// NOTE: not currently firing
ETP2PMessageReceived = Type("p2p:MessageReceived")
)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/fsnotify/fsnotify v1.4.9
github.com/ghodss/yaml v1.0.0
github.com/gofrs/flock v0.7.1 // indirect
github.com/google/flatbuffers v1.12.0
github.com/google/flatbuffers v1.12.1-0.20200706154056-969d0f7a6317
github.com/google/go-cmp v0.5.0
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-datastore v0.4.4
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/flatbuffers v1.12.0 h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w=
github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v1.12.1-0.20200706154056-969d0f7a6317 h1:jf8+d1G6Vwheoz18uzRpIH2EhxNKEXBMx+4wS1a+2iQ=
github.com/google/flatbuffers v1.12.1-0.20200706154056-969d0f7a6317/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down Expand Up @@ -485,6 +487,7 @@ github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jinzhu/copier v0.0.0-20180308034124-7e38e58719c3 h1:sHsPfNMAG70QAvKbddQ0uScZCHQoZsT5NykGRCeeeIs=
github.com/jinzhu/copier v0.0.0-20180308034124-7e38e58719c3/go.mod h1:yL958EeXv8Ylng6IfnvG4oflryUi3vgA3xPs9hmII1s=
github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a h1:zPPuIq2jAWWPTrGt70eK/BSch+gFAGrNzecsoENgu2o=
github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a/go.mod h1:yL958EeXv8Ylng6IfnvG4oflryUi3vgA3xPs9hmII1s=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
Expand Down Expand Up @@ -1043,6 +1046,7 @@ github.com/russross/blackfriday/v2 v2.0.2-0.20190629151518-3e56bb68c887/go.mod h
github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY=
github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM=
Expand Down
7 changes: 4 additions & 3 deletions lib/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/qri-io/qri/config"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/p2p"
testrepo "github.com/qri-io/qri/repo/test"
)
Expand All @@ -21,7 +22,7 @@ func TestGetConfig(t *testing.T) {
t.Fatalf("error allocating test repo: %s", err)
return
}
node, err := p2p.NewQriNode(mr, cfg.P2P)
node, err := p2p.NewQriNode(mr, cfg.P2P, event.NilBus)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestSaveConfigToFile(t *testing.T) {
t.Fatalf("error allocating test repo: %s", err)
return
}
node, err := p2p.NewQriNode(mr, cfg.P2P)
node, err := p2p.NewQriNode(mr, cfg.P2P, event.NilBus)
if err != nil {
t.Fatal(err)
}
Expand All @@ -80,7 +81,7 @@ func TestSetConfig(t *testing.T) {
t.Fatalf("error allocating test repo: %s", err)
return
}
node, err := p2p.NewQriNode(mr, cfg.P2P)
node, err := p2p.NewQriNode(mr, cfg.P2P, event.NilBus)
if err != nil {
t.Fatal(err)
}
Expand Down
23 changes: 12 additions & 11 deletions lib/datasets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/config"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/p2p"
p2ptest "github.com/qri-io/qri/p2p/test"
reporef "github.com/qri-io/qri/repo/ref"
Expand All @@ -40,7 +41,7 @@ func TestDatasetRequestsSave(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -246,7 +247,7 @@ func TestDatasetRequestsSaveZip(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -287,7 +288,7 @@ func TestDatasetRequestsList(t *testing.T) {
t.Fatalf("error getting namespace: %s", err)
}

node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -431,7 +432,7 @@ func TestDatasetRequestsGet(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -568,7 +569,7 @@ func TestDatasetRequestsGetFSIPath(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -717,7 +718,7 @@ func TestDatasetRequestsRename(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -827,7 +828,7 @@ func TestDatasetRequestsRemove(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -956,7 +957,7 @@ func TestDatasetRequestsAdd(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -1102,7 +1103,7 @@ Pirates of the Caribbean: At World's End ,foo
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -1133,7 +1134,7 @@ func TestDatasetRequestsStats(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -1208,7 +1209,7 @@ func TestListRawRefs(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion lib/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/qri-io/dataset"
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/config"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/p2p"
testrepo "github.com/qri-io/qri/repo/test"
)
Expand All @@ -28,7 +29,7 @@ func TestExport(t *testing.T) {
t.Fatalf("error allocating test repo: %s", err.Error())
}

node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion lib/fsi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
cmpopts "github.com/google/go-cmp/cmp/cmpopts"
"github.com/qri-io/dataset"
"github.com/qri-io/qri/config"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/p2p"
testrepo "github.com/qri-io/qri/repo/test"
)
Expand All @@ -24,7 +25,7 @@ func TestFSIMethodsWrite(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion lib/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
}

if inst.node == nil {
if inst.node, err = p2p.NewQriNode(inst.repo, cfg.P2P); err != nil {
if inst.node, err = p2p.NewQriNode(inst.repo, cfg.P2P, inst.bus); err != nil {
log.Error("intializing p2p:", err.Error())
return
}
Expand Down
5 changes: 3 additions & 2 deletions lib/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/qri-io/qri/config"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/p2p"
reporef "github.com/qri-io/qri/repo/ref"
testrepo "github.com/qri-io/qri/repo/test"
Expand All @@ -24,7 +25,7 @@ func TestHistoryRequestsLog(t *testing.T) {
return
}

node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestHistoryRequestsLogEntries(t *testing.T) {
return
}

node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
11 changes: 6 additions & 5 deletions lib/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/qri-io/qri/config"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/registry"
regmock "github.com/qri-io/qri/registry/regserver"
Expand Down Expand Up @@ -37,7 +38,7 @@ func TestProfileRequestsGet(t *testing.T) {
}

cfg := config.DefaultConfigForTesting()
node, err := p2p.NewQriNode(mr, cfg.P2P)
node, err := p2p.NewQriNode(mr, cfg.P2P, event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestProfileRequestsSave(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, cfg.P2P)
node, err := p2p.NewQriNode(mr, cfg.P2P, event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -126,7 +127,7 @@ func TestSaveProfile(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, cfg.P2P)
node, err := p2p.NewQriNode(mr, cfg.P2P, event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -274,7 +275,7 @@ func TestProfileRequestsSetProfilePhoto(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, cfg.P2P)
node, err := p2p.NewQriNode(mr, cfg.P2P, event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -330,7 +331,7 @@ func TestProfileRequestsSetPosterPhoto(t *testing.T) {
if err != nil {
t.Fatalf("error allocating test repo: %s", err.Error())
}
node, err := p2p.NewQriNode(mr, cfg.P2P)
node, err := p2p.NewQriNode(mr, cfg.P2P, event.NilBus)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions lib/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package lib
// // Set a seed so that the sessionID is deterministic
// rand.Seed(5678)

// node, err := p2p.NewQriNode(mr, cfg.P2P)
// node, err := p2p.NewQriNode(mr, cfg.P2P, event.NilBus)
// if err != nil {
// t.Fatal(err.Error())
// }
Expand Down Expand Up @@ -96,7 +96,7 @@ package lib
// t.Fatalf("error getting namespace: %s", err.Error())
// }

// node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting())
// node, err := p2p.NewQriNode(mr, config.DefaultP2PForTesting(), event.NilBus)
// if err != nil {
// t.Fatal(err.Error())
// }
Expand Down
Loading

0 comments on commit c444288

Please sign in to comment.