Skip to content

Commit

Permalink
refactor: Network test sync logic (sourcenetwork#2748)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves sourcenetwork#2747

## Description

This PR improves the network sync test logic to allow for further
improvements to the network implementation.

## Tasks

- [x] I made sure the code is well commented, particularly
hard-to-understand areas.
- [x] I made sure the repository-held documentation is changed
accordingly.
- [x] I made sure the pull request title adheres to the conventional
commit style (the subset used in the project can be found in
[tools/configs/chglog/config.yml](tools/configs/chglog/config.yml)).
- [x] I made sure to discuss its limitations such as threats to
validity, vulnerability to mistake and misuse, robustness to
invalidation of assumptions, resource requirements, ...

## How has this been tested?

`make test`

Specify the platform(s) on which this was tested:
- MacOS
  • Loading branch information
nasdf authored Jul 18, 2024
1 parent a7fe539 commit 517333c
Show file tree
Hide file tree
Showing 10 changed files with 457 additions and 384 deletions.
13 changes: 13 additions & 0 deletions net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ipfs/boxo/blockservice"
exchange "github.com/ipfs/boxo/exchange"
"github.com/ipfs/boxo/ipns"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
libp2p "github.com/libp2p/go-libp2p"
Expand Down Expand Up @@ -446,6 +447,13 @@ func (p *Peer) handleDocUpdateLog(evt event.Update) error {
}

func (p *Peer) pushLogToReplicators(lg event.Update) {
// let the exchange know we have this block
// this should speed up the dag sync process
err := p.bserv.Exchange().NotifyNewBlocks(p.ctx, blocks.NewBlock(lg.Block))
if err != nil {
log.ErrorContextE(p.ctx, "Failed to notify new blocks", err)
}

// push to each peer (replicator)
peers := make(map[string]struct{})
for _, peer := range p.ps.ListPeers(lg.DocID) {
Expand Down Expand Up @@ -504,6 +512,11 @@ func stopGRPCServer(ctx context.Context, server *grpc.Server) {
}
}

// Connect initiates a connection to the peer with the given address.
func (p *Peer) Connect(ctx context.Context, addr peer.AddrInfo) error {
return p.host.Connect(ctx, addr)
}

// Bootstrap connects to the given peers.
func (p *Peer) Bootstrap(addrs []peer.AddrInfo) {
var connected uint64
Expand Down
75 changes: 57 additions & 18 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,24 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
if err != nil {
return nil, err
}

log.InfoContext(ctx, "Received pushlog",
corelog.Any("PeerID", pid.String()),
corelog.Any("Creator", byPeer.String()),
corelog.Any("DocID", docID.String()))

log.InfoContext(ctx, "Starting DAG sync",
corelog.Any("PeerID", pid.String()),
corelog.Any("DocID", docID.String()))

err = syncDAG(ctx, s.peer.bserv, block)
if err != nil {
return nil, err
}

s.peer.bus.Publish(event.NewMessage(event.MergeName, event.Merge{
DocID: docID.String(),
ByPeer: byPeer,
FromPeer: pid,
Cid: headCID,
SchemaRoot: string(req.Body.SchemaRoot),
}))
log.InfoContext(ctx, "DAG sync complete",
corelog.Any("PeerID", pid.String()),
corelog.Any("DocID", docID.String()))

// Once processed, subscribe to the DocID topic on the pubsub network unless we already
// suscribe to the collection.
Expand All @@ -146,6 +152,15 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
return nil, err
}
}

s.peer.bus.Publish(event.NewMessage(event.MergeName, event.Merge{
DocID: docID.String(),
ByPeer: byPeer,
FromPeer: pid,
Cid: headCID,
SchemaRoot: string(req.Body.SchemaRoot),
}))

return &pb.PushLogReply{}, nil
}

Expand All @@ -163,6 +178,10 @@ func (s *server) addPubSubTopic(topic string, subscribe bool) error {
return nil
}

log.InfoContext(s.peer.ctx, "Adding pubsub topic",
corelog.String("PeerID", s.peer.PeerID().String()),
corelog.String("Topic", topic))

s.mu.Lock()
defer s.mu.Unlock()
if t, ok := s.topics[topic]; ok {
Expand Down Expand Up @@ -205,6 +224,10 @@ func (s *server) removePubSubTopic(topic string) error {
return nil
}

log.InfoContext(s.peer.ctx, "Removing pubsub topic",
corelog.String("PeerID", s.peer.PeerID().String()),
corelog.String("Topic", topic))

s.mu.Lock()
defer s.mu.Unlock()
if t, ok := s.topics[topic]; ok {
Expand All @@ -218,6 +241,10 @@ func (s *server) removeAllPubsubTopics() error {
if s.peer.ps == nil {
return nil
}

log.InfoContext(s.peer.ctx, "Removing all pubsub topics",
corelog.String("PeerID", s.peer.PeerID().String()))

s.mu.Lock()
defer s.mu.Unlock()
for id, t := range s.topics {
Expand All @@ -232,6 +259,10 @@ func (s *server) removeAllPubsubTopics() error {
// publishLog publishes the given PushLogRequest object on the PubSub network via the
// corresponding topic
func (s *server) publishLog(ctx context.Context, topic string, req *pb.PushLogRequest) error {
log.InfoContext(ctx, "Publish log",
corelog.String("PeerID", s.peer.PeerID().String()),
corelog.String("Topic", topic))

if s.peer.ps == nil { // skip if we aren't running with a pubsub net
return nil
}
Expand Down Expand Up @@ -259,12 +290,16 @@ func (s *server) publishLog(ctx context.Context, topic string, req *pb.PushLogRe

// pubSubMessageHandler handles incoming PushLog messages from the pubsub network.
func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte) ([]byte, error) {
log.InfoContext(s.peer.ctx, "Received new pubsub event",
corelog.String("PeerID", s.peer.PeerID().String()),
corelog.Any("SenderId", from),
corelog.String("Topic", topic))

req := new(pb.PushLogRequest)
if err := proto.Unmarshal(msg, req); err != nil {
log.ErrorContextE(s.peer.ctx, "Failed to unmarshal pubsub message %s", err)
return nil, err
}

ctx := grpcpeer.NewContext(s.peer.ctx, &grpcpeer.Peer{
Addr: addr{from},
})
Expand All @@ -276,9 +311,8 @@ func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte)

// pubSubEventHandler logs events from the subscribed DocID topics.
func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) {
log.InfoContext(
s.peer.ctx,
"Received new pubsub event",
log.InfoContext(s.peer.ctx, "Received new pubsub event",
corelog.String("PeerID", s.peer.PeerID().String()),
corelog.Any("SenderId", from),
corelog.String("Topic", topic),
corelog.String("Message", string(msg)),
Expand Down Expand Up @@ -329,7 +363,18 @@ func (s *server) updatePubSubTopics(evt event.P2PTopic) {
}

func (s *server) updateReplicators(evt event.Replicator) {
isDeleteRep := len(evt.Schemas) == 0
if len(evt.Schemas) == 0 {
// remove peer from store
s.peer.host.Peerstore().ClearAddrs(evt.Info.ID)
} else {
// add peer to store
s.peer.host.Peerstore().AddAddrs(evt.Info.ID, evt.Info.Addrs, peerstore.PermanentAddrTTL)
// connect to the peer
if err := s.peer.Connect(s.peer.ctx, evt.Info); err != nil {
log.ErrorContextE(s.peer.ctx, "Failed to connect to replicator peer", err)
}
}

// update the cached replicators
s.mu.Lock()
for schema, peers := range s.replicators {
Expand All @@ -350,12 +395,6 @@ func (s *server) updateReplicators(evt event.Replicator) {
}
s.mu.Unlock()

if isDeleteRep {
s.peer.host.Peerstore().ClearAddrs(evt.Info.ID)
} else {
s.peer.host.Peerstore().AddAddrs(evt.Info.ID, evt.Info.Addrs, peerstore.PermanentAddrTTL)
}

if evt.Docs != nil {
for update := range evt.Docs {
if err := s.pushLog(s.peer.ctx, update, evt.Info.ID); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/acp/register_and_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,8 @@ func TestACP_CreateWithIdentityAndUpdateWithoutIdentityGQL_CanNotUpdate(t *testi
"name": "Shahzad Lone"
}
`,

SkipLocalUpdateEvent: true,
},

testUtils.Request{
Expand Down Expand Up @@ -764,6 +766,8 @@ func TestACP_CreateWithIdentityAndUpdateWithWrongIdentityGQL_CanNotUpdate(t *tes
"name": "Shahzad Lone"
}
`,

SkipLocalUpdateEvent: true,
},

testUtils.Request{
Expand Down
Loading

0 comments on commit 517333c

Please sign in to comment.