Skip to content

Commit

Permalink
test: Add integration testing for P2P. (#655)
Browse files Browse the repository at this point in the history
Relevant issue(s)
Resolves #564

Description
This PR adds P2P integration testing functionalities. It's limited to passive syncing of nodes on a given document. The test suite starts 2 Defra nodes, waits for them to connect and then proceeds to update the commonly shared document. We then ensure that the changes on one node are reflected on the other.
  • Loading branch information
fredcarle authored Jul 25, 2022
1 parent 7a627d6 commit 9d4a333
Show file tree
Hide file tree
Showing 9 changed files with 652 additions and 14 deletions.
3 changes: 1 addition & 2 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ func (c *collection) getAllDocKeysChan(
default:
// noop, just continue on the with the for loop
}

if res.Error != nil {
resCh <- client.DocKeysResult{
Err: res.Error,
Expand All @@ -334,7 +333,7 @@ func (c *collection) getAllDocKeysChan(
}

// now we have a doc key
rawDocKey := ds.NewKey(res.Key).Type()
rawDocKey := ds.NewKey(res.Key).BaseNamespace()
key, err := client.NewDocKeyFromString(rawDocKey)
if err != nil {
resCh <- client.DocKeysResult{
Expand Down
2 changes: 1 addition & 1 deletion merkle/clock/heads.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (hh *heads) Replace(ctx context.Context, h, c cid.Cid, height uint64) error
}

func (hh *heads) Add(ctx context.Context, c cid.Cid, height uint64) error {
log.Info(ctx, "Adding new DAG head",
log.Debug(ctx, "Adding new DAG head",
logging.NewKV("CID", c),
logging.NewKV("Height", height))
return hh.write(ctx, hh.store, c, height)
Expand Down
5 changes: 4 additions & 1 deletion merkle/crdt/merklecrdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package crdt
import (
"context"
"fmt"
"time"

"github.com/sourcenetwork/defradb/core"
corenet "github.com/sourcenetwork/defradb/core/net"
Expand All @@ -26,6 +27,8 @@ var (
log = logging.MustNewLogger("defra.merklecrdt")
)

const broadcasterTimeout = time.Second

// MerkleCRDT is the implementation of a Merkle Clock along with a
// CRDT payload. It implements the ReplicatedData interface
// so it can be merged with any given semantics.
Expand Down Expand Up @@ -109,7 +112,7 @@ func (base *baseMerkleCRDT) Broadcast(ctx context.Context, nd ipld.Node, delta c
Block: nd,
Priority: netdelta.GetPriority(),
}
if err := base.broadcaster.Send(lg); err != nil {
if err := base.broadcaster.SendWithTimeout(lg, broadcasterTimeout); err != nil {
log.ErrorE(
ctx,
"Failed to broadcast MerkleCRDT update",
Expand Down
20 changes: 20 additions & 0 deletions net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ func (p *Peer) Close() error {
stopGRPCServer(p.ctx, p.p2pRPC)
// stopGRPCServer(p.tcpRPC)

// close event emitters
if p.server.pubSubEmitter != nil {
if err := p.server.pubSubEmitter.Close(); err != nil {
log.Info(p.ctx, "Could not close pubsub event emitter", logging.NewKV("Error", err))
}
}
if p.server.pushLogEmitter != nil {
if err := p.server.pushLogEmitter.Close(); err != nil {
log.Info(p.ctx, "Could not close push log event emitter", logging.NewKV("Error", err))
}
}

p.bus.Discard()
p.cancel()
return nil
Expand Down Expand Up @@ -463,3 +475,11 @@ func stopGRPCServer(ctx context.Context, server *grpc.Server) {
timer.Stop()
}
}

type EvtReceivedPushLog struct {
Peer peer.ID
}

type EvtPubSub struct {
Peer peer.ID
}
35 changes: 35 additions & 0 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/gogo/protobuf/proto"
format "github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-libp2p-core/event"
libpeer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/textileio/go-libp2p-pubsub-rpc"
"google.golang.org/grpc"
Expand All @@ -45,6 +46,9 @@ type server struct {
mu sync.Mutex

conns map[libpeer.ID]*grpc.ClientConn

pubSubEmitter event.Emitter
pushLogEmitter event.Emitter
}

// newServer creates a new network server that handle/directs RPC requests to the
Expand Down Expand Up @@ -93,6 +97,16 @@ func newServer(p *Peer, db client.DB, opts ...grpc.DialOption) (*server, error)
log.Debug(p.ctx, "Finished registering all DocKey pubsub topics", logging.NewKV("Count", i))
}

var err error
s.pubSubEmitter, err = s.peer.host.EventBus().Emitter(new(EvtPubSub))
if err != nil {
log.Info(s.peer.ctx, "could not create event emitter", logging.NewKV("Error", err))
}
s.pushLogEmitter, err = s.peer.host.EventBus().Emitter(new(EvtReceivedPushLog))
if err != nil {
log.Info(s.peer.ctx, "could not create event emitter", logging.NewKV("Error", err))
}

return s, nil
}

Expand Down Expand Up @@ -177,6 +191,17 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
log.Debug(ctx, "No more children to process for log", logging.NewKV("CID", cid))
}

if s.pushLogEmitter != nil {
err = s.pushLogEmitter.Emit(EvtReceivedPushLog{
Peer: pid,
})
if err != nil {
// logging instead of returning an error because the event bus should
// not break the PushLog execution.
log.Info(ctx, "could not emit push log event", logging.NewKV("Error", err))
}
}

return &pb.PushLogReply{}, nil
}

Expand Down Expand Up @@ -303,7 +328,17 @@ func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) {
"Received new pubsub event",
logging.NewKV("SenderId", from),
logging.NewKV("Topic", topic),
logging.NewKV("Message", string(msg)),
)

if s.pubSubEmitter != nil {
err := s.pubSubEmitter.Emit(EvtPubSub{
Peer: from,
})
if err != nil {
log.Info(s.peer.ctx, "could not emit pubsub event", logging.NewKV("Error", err))
}
}
}

func (s *server) listAllDocKeys() (<-chan client.DocKeysResult, error) {
Expand Down
141 changes: 133 additions & 8 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ package node

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

ipfslite "github.com/hsanjuan/ipfs-lite"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -42,6 +46,8 @@ var (
log = logging.MustNewLogger("defra.node")
)

const evtWaitTimeout = 10 * time.Second

type Node struct {
// embed the DB interface into the node
client.DB
Expand All @@ -52,6 +58,15 @@ type Node struct {
pubsub *pubsub.PubSub
litepeer *ipfslite.Peer

// receives an event when the status of a peer connection changes.
peerEvent chan event.EvtPeerConnectednessChanged

// receives an event when a pubsub topic is added.
pubSubEvent chan net.EvtPubSub

// receives an event when a pushLog request has been processed.
pushLogEvent chan net.EvtReceivedPushLog

ctx context.Context
}

Expand Down Expand Up @@ -144,20 +159,130 @@ func NewNode(
return nil, fin.Cleanup(err)
}

return &Node{
Peer: peer,
host: h,
pubsub: ps,
DB: db,
litepeer: lite,
ctx: ctx,
}, nil
n := &Node{
pubSubEvent: make(chan net.EvtPubSub),
pushLogEvent: make(chan net.EvtReceivedPushLog),
peerEvent: make(chan event.EvtPeerConnectednessChanged),
Peer: peer,
host: h,
pubsub: ps,
DB: db,
litepeer: lite,
ctx: ctx,
}

n.subscribeToPeerConnectionEvents()
n.subscribeToPubSubEvents()
n.subscribeToPushLogEvents()

return n, nil
}

func (n *Node) Boostrap(addrs []peer.AddrInfo) {
n.litepeer.Bootstrap(addrs)
}

// PeerID returns the node's peer ID.
func (n *Node) PeerID() peer.ID {
return n.host.ID()
}

// subscribeToPeerConnectionEvents subscribes the node to the event bus for a peer connection change.
func (n *Node) subscribeToPeerConnectionEvents() {
sub, err := n.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
log.Info(
n.ctx,
fmt.Sprintf("failed to subscribe to peer connectedness changed event: %v", err),
)
}
go func() {
for e := range sub.Out() {
n.peerEvent <- e.(event.EvtPeerConnectednessChanged)
}
}()
}

// subscribeToPubSubEvents subscribes the node to the event bus for a pubsub.
func (n *Node) subscribeToPubSubEvents() {
sub, err := n.host.EventBus().Subscribe(new(net.EvtPubSub))
if err != nil {
log.Info(
n.ctx,
fmt.Sprintf("failed to subscribe to pubsub event: %v", err),
)
}
go func() {
for e := range sub.Out() {
n.pubSubEvent <- e.(net.EvtPubSub)
}
}()
}

// subscribeToPushLogEvents subscribes the node to the event bus for a push log request completion.
func (n *Node) subscribeToPushLogEvents() {
sub, err := n.host.EventBus().Subscribe(new(net.EvtReceivedPushLog))
if err != nil {
log.Info(
n.ctx,
fmt.Sprintf("failed to subscribe to push log event: %v", err),
)
}
go func() {
for e := range sub.Out() {
n.pushLogEvent <- e.(net.EvtReceivedPushLog)
}
}()
}

// WaitForPeerConnectionEvent listens to the event channel for a connection event from a given peer.
func (n *Node) WaitForPeerConnectionEvent(id peer.ID) error {
if n.host.Network().Connectedness(id) == network.Connected {
return nil
}
for {
select {
case evt := <-n.peerEvent:
if evt.Peer != id {
continue
}
return nil
case <-time.After(evtWaitTimeout):
return fmt.Errorf("waiting for peer connection timed out")
}
}
}

// WaitForPubSubEvent listens to the event channel for pub sub event from a given peer.
func (n *Node) WaitForPubSubEvent(id peer.ID) error {
for {
select {
case evt := <-n.pubSubEvent:
if evt.Peer != id {
continue
}
return nil
case <-time.After(evtWaitTimeout):
return fmt.Errorf("waiting for pubsub timed out")
}
}
}

// WaitForPushLogEvent listens to the event channel for a push log event from a given peer.
func (n *Node) WaitForPushLogEvent(id peer.ID) error {
for {
select {
case evt := <-n.pushLogEvent:
if evt.Peer != id {
continue
}
return nil
case <-time.After(evtWaitTimeout):
return fmt.Errorf("waiting for pushlog timed out")
}
}
}

// replace with proper keystore
func getHostKey(keypath string) (crypto.PrivKey, error) {
// If a local datastore is used, the key is written to a file
Expand Down
Loading

0 comments on commit 9d4a333

Please sign in to comment.