diff --git a/net/peer.go b/net/peer.go index 00ea8653a0..7222d7cf9f 100644 --- a/net/peer.go +++ b/net/peer.go @@ -24,6 +24,7 @@ import ( "github.com/ipfs/boxo/blockservice" exchange "github.com/ipfs/boxo/exchange" "github.com/ipfs/boxo/ipns" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" libp2p "github.com/libp2p/go-libp2p" @@ -446,6 +447,13 @@ func (p *Peer) handleDocUpdateLog(evt event.Update) error { } func (p *Peer) pushLogToReplicators(lg event.Update) { + // let the exchange know we have this block + // this should speed up the dag sync process + err := p.bserv.Exchange().NotifyNewBlocks(p.ctx, blocks.NewBlock(lg.Block)) + if err != nil { + log.ErrorContextE(p.ctx, "Failed to notify new blocks", err) + } + // push to each peer (replicator) peers := make(map[string]struct{}) for _, peer := range p.ps.ListPeers(lg.DocID) { @@ -504,6 +512,11 @@ func stopGRPCServer(ctx context.Context, server *grpc.Server) { } } +// Connect initiates a connection to the peer with the given address. +func (p *Peer) Connect(ctx context.Context, addr peer.AddrInfo) error { + return p.host.Connect(ctx, addr) +} + // Bootstrap connects to the given peers. func (p *Peer) Bootstrap(addrs []peer.AddrInfo) { var connected uint64 diff --git a/net/server.go b/net/server.go index 3b4922fe5e..0e36eb7e3c 100644 --- a/net/server.go +++ b/net/server.go @@ -125,18 +125,24 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL if err != nil { return nil, err } + + log.InfoContext(ctx, "Received pushlog", + corelog.Any("PeerID", pid.String()), + corelog.Any("Creator", byPeer.String()), + corelog.Any("DocID", docID.String())) + + log.InfoContext(ctx, "Starting DAG sync", + corelog.Any("PeerID", pid.String()), + corelog.Any("DocID", docID.String())) + err = syncDAG(ctx, s.peer.bserv, block) if err != nil { return nil, err } - s.peer.bus.Publish(event.NewMessage(event.MergeName, event.Merge{ - DocID: docID.String(), - ByPeer: byPeer, - FromPeer: pid, - Cid: headCID, - SchemaRoot: string(req.Body.SchemaRoot), - })) + log.InfoContext(ctx, "DAG sync complete", + corelog.Any("PeerID", pid.String()), + corelog.Any("DocID", docID.String())) // Once processed, subscribe to the DocID topic on the pubsub network unless we already // suscribe to the collection. @@ -146,6 +152,15 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL return nil, err } } + + s.peer.bus.Publish(event.NewMessage(event.MergeName, event.Merge{ + DocID: docID.String(), + ByPeer: byPeer, + FromPeer: pid, + Cid: headCID, + SchemaRoot: string(req.Body.SchemaRoot), + })) + return &pb.PushLogReply{}, nil } @@ -163,6 +178,10 @@ func (s *server) addPubSubTopic(topic string, subscribe bool) error { return nil } + log.InfoContext(s.peer.ctx, "Adding pubsub topic", + corelog.String("PeerID", s.peer.PeerID().String()), + corelog.String("Topic", topic)) + s.mu.Lock() defer s.mu.Unlock() if t, ok := s.topics[topic]; ok { @@ -205,6 +224,10 @@ func (s *server) removePubSubTopic(topic string) error { return nil } + log.InfoContext(s.peer.ctx, "Removing pubsub topic", + corelog.String("PeerID", s.peer.PeerID().String()), + corelog.String("Topic", topic)) + s.mu.Lock() defer s.mu.Unlock() if t, ok := s.topics[topic]; ok { @@ -218,6 +241,10 @@ func (s *server) removeAllPubsubTopics() error { if s.peer.ps == nil { return nil } + + log.InfoContext(s.peer.ctx, "Removing all pubsub topics", + corelog.String("PeerID", s.peer.PeerID().String())) + s.mu.Lock() defer s.mu.Unlock() for id, t := range s.topics { @@ -232,6 +259,10 @@ func (s *server) removeAllPubsubTopics() error { // publishLog publishes the given PushLogRequest object on the PubSub network via the // corresponding topic func (s *server) publishLog(ctx context.Context, topic string, req *pb.PushLogRequest) error { + log.InfoContext(ctx, "Publish log", + corelog.String("PeerID", s.peer.PeerID().String()), + corelog.String("Topic", topic)) + if s.peer.ps == nil { // skip if we aren't running with a pubsub net return nil } @@ -259,12 +290,16 @@ func (s *server) publishLog(ctx context.Context, topic string, req *pb.PushLogRe // pubSubMessageHandler handles incoming PushLog messages from the pubsub network. func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte) ([]byte, error) { + log.InfoContext(s.peer.ctx, "Received new pubsub event", + corelog.String("PeerID", s.peer.PeerID().String()), + corelog.Any("SenderId", from), + corelog.String("Topic", topic)) + req := new(pb.PushLogRequest) if err := proto.Unmarshal(msg, req); err != nil { log.ErrorContextE(s.peer.ctx, "Failed to unmarshal pubsub message %s", err) return nil, err } - ctx := grpcpeer.NewContext(s.peer.ctx, &grpcpeer.Peer{ Addr: addr{from}, }) @@ -276,9 +311,8 @@ func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte) // pubSubEventHandler logs events from the subscribed DocID topics. func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) { - log.InfoContext( - s.peer.ctx, - "Received new pubsub event", + log.InfoContext(s.peer.ctx, "Received new pubsub event", + corelog.String("PeerID", s.peer.PeerID().String()), corelog.Any("SenderId", from), corelog.String("Topic", topic), corelog.String("Message", string(msg)), @@ -329,7 +363,18 @@ func (s *server) updatePubSubTopics(evt event.P2PTopic) { } func (s *server) updateReplicators(evt event.Replicator) { - isDeleteRep := len(evt.Schemas) == 0 + if len(evt.Schemas) == 0 { + // remove peer from store + s.peer.host.Peerstore().ClearAddrs(evt.Info.ID) + } else { + // add peer to store + s.peer.host.Peerstore().AddAddrs(evt.Info.ID, evt.Info.Addrs, peerstore.PermanentAddrTTL) + // connect to the peer + if err := s.peer.Connect(s.peer.ctx, evt.Info); err != nil { + log.ErrorContextE(s.peer.ctx, "Failed to connect to replicator peer", err) + } + } + // update the cached replicators s.mu.Lock() for schema, peers := range s.replicators { @@ -350,12 +395,6 @@ func (s *server) updateReplicators(evt event.Replicator) { } s.mu.Unlock() - if isDeleteRep { - s.peer.host.Peerstore().ClearAddrs(evt.Info.ID) - } else { - s.peer.host.Peerstore().AddAddrs(evt.Info.ID, evt.Info.Addrs, peerstore.PermanentAddrTTL) - } - if evt.Docs != nil { for update := range evt.Docs { if err := s.pushLog(s.peer.ctx, update, evt.Info.ID); err != nil { diff --git a/tests/integration/acp/register_and_update_test.go b/tests/integration/acp/register_and_update_test.go index edfeb99c97..842ef1b754 100644 --- a/tests/integration/acp/register_and_update_test.go +++ b/tests/integration/acp/register_and_update_test.go @@ -650,6 +650,8 @@ func TestACP_CreateWithIdentityAndUpdateWithoutIdentityGQL_CanNotUpdate(t *testi "name": "Shahzad Lone" } `, + + SkipLocalUpdateEvent: true, }, testUtils.Request{ @@ -764,6 +766,8 @@ func TestACP_CreateWithIdentityAndUpdateWithWrongIdentityGQL_CanNotUpdate(t *tes "name": "Shahzad Lone" } `, + + SkipLocalUpdateEvent: true, }, testUtils.Request{ diff --git a/tests/integration/events.go b/tests/integration/events.go new file mode 100644 index 0000000000..946c089d4a --- /dev/null +++ b/tests/integration/events.go @@ -0,0 +1,242 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "time" + + "github.com/sourcenetwork/immutable" + "github.com/stretchr/testify/require" + + "github.com/sourcenetwork/defradb/event" +) + +// eventTimeout is the amount of time to wait +// for an event before timing out +const eventTimeout = 1 * time.Second + +// waitForNetworkSetupEvents waits for p2p topic completed and +// replicator completed events to be published on the local node event bus. +func waitForNetworkSetupEvents(s *state, nodeID int) { + cols, err := s.nodes[nodeID].GetAllP2PCollections(s.ctx) + require.NoError(s.t, err) + + reps, err := s.nodes[nodeID].GetAllReplicators(s.ctx) + require.NoError(s.t, err) + + replicatorEvents := len(reps) + p2pTopicEvent := len(cols) > 0 + + for p2pTopicEvent && replicatorEvents > 0 { + select { + case _, ok := <-s.nodeEvents[nodeID].replicator.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for network setup events") + } + replicatorEvents-- + + case _, ok := <-s.nodeEvents[nodeID].p2pTopic.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for network setup events") + } + p2pTopicEvent = false + + case <-time.After(eventTimeout): + s.t.Fatalf("timeout waiting for network setup events") + } + } +} + +// waitForReplicatorConfigureEvent waits for a node to publish a +// replicator completed event on the local event bus. +// +// Expected document heads will be updated for the targeted node. +func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { + select { + case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for replicator event") + } + + case <-time.After(eventTimeout): + require.Fail(s.t, "timeout waiting for replicator event") + } + + // all previous documents should be merged on the subscriber node + for key, val := range s.nodeP2P[cfg.SourceNodeID].actualDocHeads { + s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val + } + + // update node connections and replicators + s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} + s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} + s.nodeP2P[cfg.SourceNodeID].replicators[cfg.TargetNodeID] = struct{}{} +} + +// waitForReplicatorConfigureEvent waits for a node to publish a +// replicator completed event on the local event bus. +func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { + select { + case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for replicator event") + } + + case <-time.After(eventTimeout): + require.Fail(s.t, "timeout waiting for replicator event") + } + + delete(s.nodeP2P[cfg.TargetNodeID].connections, cfg.SourceNodeID) + delete(s.nodeP2P[cfg.SourceNodeID].connections, cfg.TargetNodeID) + delete(s.nodeP2P[cfg.SourceNodeID].replicators, cfg.TargetNodeID) +} + +// waitForSubscribeToCollectionEvent waits for a node to publish a +// p2p topic completed event on the local event bus. +// +// Expected document heads will be updated for the subscriber node. +func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { + select { + case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for p2p topic event") + } + + case <-time.After(eventTimeout): + require.Fail(s.t, "timeout waiting for p2p topic event") + } + + // update peer collections of target node + for _, collectionIndex := range action.CollectionIDs { + if collectionIndex == NonExistentCollectionID { + continue // don't track non existent collections + } + s.nodeP2P[action.NodeID].peerCollections[collectionIndex] = struct{}{} + } +} + +// waitForSubscribeToCollectionEvent waits for a node to publish a +// p2p topic completed event on the local event bus. +func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollection) { + select { + case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for p2p topic event") + } + + case <-time.After(eventTimeout): + require.Fail(s.t, "timeout waiting for p2p topic event") + } + + for _, collectionIndex := range action.CollectionIDs { + if collectionIndex == NonExistentCollectionID { + continue // don't track non existent collections + } + delete(s.nodeP2P[action.NodeID].peerCollections, collectionIndex) + } +} + +// waitForUpdateEvents waits for all selected nodes to publish an +// update event to the local event bus. +// +// Expected document heads will be updated for any connected nodes. +func waitForUpdateEvents(s *state, nodeID immutable.Option[int], collectionID int) { + for i := 0; i < len(s.nodes); i++ { + if nodeID.HasValue() && nodeID.Value() != i { + continue // node is not selected + } + + var evt event.Update + select { + case msg, ok := <-s.nodeEvents[i].update.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for update event") + } + evt = msg.Data.(event.Update) + + case <-time.After(eventTimeout): + require.Fail(s.t, "timeout waiting for update event") + } + + if i >= len(s.nodeConfigs) { + return // not testing network state + } + + // make sure the event is published on the network before proceeding + // this prevents nodes from missing messages that are sent before + // subscriptions are setup + time.Sleep(100 * time.Millisecond) + + // update the actual document head on the node that updated it + s.nodeP2P[i].actualDocHeads[evt.DocID] = evt.Cid + + // update the expected document heads of replicator targets + for id := range s.nodeP2P[i].replicators { + // replicator target nodes push updates to source nodes + s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid + } + + // update the expected document heads of connected nodes + for id := range s.nodeP2P[i].connections { + // connected nodes share updates of documents they have in common + if _, ok := s.nodeP2P[id].actualDocHeads[evt.DocID]; ok { + s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid + } + // peer collection subscribers receive updates from any other subscriber node + if _, ok := s.nodeP2P[id].peerCollections[collectionID]; ok { + s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid + } + } + } +} + +// waitForMergeEvents waits for all expected document heads to be merged to all nodes. +// +// Will fail the test if an event is not received within the expected time interval to prevent tests +// from running forever. +func waitForMergeEvents(s *state) { + for nodeID := 0; nodeID < len(s.nodes); nodeID++ { + expect := s.nodeP2P[nodeID].expectedDocHeads + + // remove any docs that are already merged + // up to the expected document head + for key, val := range s.nodeP2P[nodeID].actualDocHeads { + if head, ok := expect[key]; ok && head.String() == val.String() { + delete(expect, key) + } + } + + // wait for all expected doc heads to be merged + // + // the order of merges does not matter as we only + // expect the latest head to eventually be merged + // + // unexpected merge events are ignored + for len(expect) > 0 { + var evt event.Merge + select { + case msg, ok := <-s.nodeEvents[nodeID].merge.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for merge complete event") + } + evt = msg.Data.(event.Merge) + + case <-time.After(30 * eventTimeout): + require.Fail(s.t, "timeout waiting for merge complete event") + } + + head, ok := expect[evt.DocID] + if ok && head.String() == evt.Cid.String() { + delete(expect, evt.DocID) + } + } + } +} diff --git a/tests/integration/net/simple/peer/with_delete_test.go b/tests/integration/net/simple/peer/with_delete_test.go index b0b5fe3ded..01f03e2c75 100644 --- a/tests/integration/net/simple/peer/with_delete_test.go +++ b/tests/integration/net/simple/peer/with_delete_test.go @@ -175,7 +175,6 @@ func TestP2PWithMultipleDocumentsWithSingleUpdateBeforeConnectSingleDeleteWithSh Doc: `{ "Age": 60 }`, - DontSync: true, }, testUtils.ConnectPeers{ SourceNodeID: 0, @@ -247,7 +246,6 @@ func TestP2PWithMultipleDocumentsWithMultipleUpdatesBeforeConnectSingleDeleteWit Doc: `{ "Age": 60 }`, - DontSync: true, }, testUtils.UpdateDoc{ // Update John's Age on the first node only @@ -256,7 +254,6 @@ func TestP2PWithMultipleDocumentsWithMultipleUpdatesBeforeConnectSingleDeleteWit Doc: `{ "Age": 62 }`, - DontSync: true, }, testUtils.ConnectPeers{ SourceNodeID: 0, @@ -328,7 +325,6 @@ func TestP2PWithMultipleDocumentsWithUpdateAndDeleteBeforeConnectSingleDeleteWit Doc: `{ "Age": 60 }`, - DontSync: true, }, testUtils.UpdateDoc{ // Update John's Age on the first node only @@ -337,12 +333,10 @@ func TestP2PWithMultipleDocumentsWithUpdateAndDeleteBeforeConnectSingleDeleteWit Doc: `{ "Age": 62 }`, - DontSync: true, }, testUtils.DeleteDoc{ - NodeID: immutable.Some(0), - DocID: 0, - DontSync: true, + NodeID: immutable.Some(0), + DocID: 0, }, testUtils.ConnectPeers{ SourceNodeID: 0, diff --git a/tests/integration/net/simple/replicator/with_create_test.go b/tests/integration/net/simple/replicator/with_create_test.go index 5785337b98..08ef2baeec 100644 --- a/tests/integration/net/simple/replicator/with_create_test.go +++ b/tests/integration/net/simple/replicator/with_create_test.go @@ -12,7 +12,6 @@ package replicator import ( "testing" - "time" "github.com/sourcenetwork/immutable" @@ -182,7 +181,6 @@ func TestP2POneToOneReplicatorDoesNotSyncFromDeletedReplicator(t *testing.T) { }, testUtils.WaitForSync{ // No documents should be synced - ExpectedTimeout: 100 * time.Millisecond, }, testUtils.Request{ // Assert that John has not been synced to the second (target) node diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 4a57ac7a1a..b1b79982cf 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -14,7 +14,6 @@ import ( "time" "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/net" "github.com/libp2p/go-libp2p/core/peer" @@ -135,10 +134,7 @@ type GetAllP2PCollections struct { // // For example you will likely wish to `WaitForSync` after creating a document in node 0 before querying // node 1 to see if it has been replicated. -type WaitForSync struct { - // ExpectedTimeout is the duration to wait when expecting a timeout to occur. - ExpectedTimeout time.Duration -} +type WaitForSync struct{} // connectPeers connects two existing, started, nodes as peers. It returns a channel // that will receive an empty struct upon sync completion of all expected peer-sync events. @@ -148,9 +144,6 @@ func connectPeers( s *state, cfg ConnectPeers, ) { - // If we have some database actions prior to connecting the peers, we want to ensure that they had time to - // complete before we connect. Otherwise we might wrongly catch them in our wait function. - time.Sleep(100 * time.Millisecond) sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] @@ -158,119 +151,13 @@ func connectPeers( log.InfoContext(s.ctx, "Bootstrapping with peers", corelog.Any("Addresses", addrs)) sourceNode.Bootstrap(addrs) + s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} + s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} + // Bootstrap triggers a bunch of async stuff for which we have no good way of waiting on. It must be // allowed to complete before documentation begins or it will not even try and sync it. So for now, we // sleep a little. time.Sleep(100 * time.Millisecond) - setupPeerWaitSync(s, 0, cfg) -} - -func setupPeerWaitSync( - s *state, - startIndex int, - cfg ConnectPeers, -) { - sourceToTargetEvents := []int{0} - targetToSourceEvents := []int{0} - - nodeCollections := map[int][]int{} - waitIndex := 0 - for i := startIndex; i < len(s.testCase.Actions); i++ { - switch action := s.testCase.Actions[i].(type) { - case SubscribeToCollection: - if action.ExpectedError != "" { - // If the subscription action is expected to error, then we should do nothing here. - continue - } - // This is order dependent, items should be added in the same action-loop that reads them - // as 'stuff' done before collection subscription should not be synced. - nodeCollections[action.NodeID] = append(nodeCollections[action.NodeID], action.CollectionIDs...) - - case UnsubscribeToCollection: - if action.ExpectedError != "" { - // If the unsubscribe action is expected to error, then we should do nothing here. - continue - } - - // This is order dependent, items should be added in the same action-loop that reads them - // as 'stuff' done before collection subscription should not be synced. - existingCollectionIndexes := nodeCollections[action.NodeID] - for _, collectionIndex := range action.CollectionIDs { - for i, existingCollectionIndex := range existingCollectionIndexes { - if collectionIndex == existingCollectionIndex { - // Remove the matching collection index from the set: - existingCollectionIndexes = append(existingCollectionIndexes[:i], existingCollectionIndexes[i+1:]...) - } - } - } - nodeCollections[action.NodeID] = existingCollectionIndexes - - case CreateDoc: - sourceCollectionSubscribed := collectionSubscribedTo(nodeCollections, cfg.SourceNodeID, action.CollectionID) - targetCollectionSubscribed := collectionSubscribedTo(nodeCollections, cfg.TargetNodeID, action.CollectionID) - - // Peers sync trigger sync events for documents that exist prior to configuration, even if they already - // exist at the destination, so we need to wait for documents created on all nodes, as well as those - // created on the target. - if (!action.NodeID.HasValue() || - action.NodeID.Value() == cfg.TargetNodeID) && - sourceCollectionSubscribed { - targetToSourceEvents[waitIndex] += 1 - } - - // Peers sync trigger sync events for documents that exist prior to configuration, even if they already - // exist at the destination, so we need to wait for documents created on all nodes, as well as those - // created on the source. - if (!action.NodeID.HasValue() || - action.NodeID.Value() == cfg.SourceNodeID) && - targetCollectionSubscribed { - sourceToTargetEvents[waitIndex] += 1 - } - - case DeleteDoc: - // Updates to existing docs should always sync (no-sub required) - if !action.DontSync && action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { - targetToSourceEvents[waitIndex] += 1 - } - if !action.DontSync && action.NodeID.HasValue() && action.NodeID.Value() == cfg.SourceNodeID { - sourceToTargetEvents[waitIndex] += 1 - } - - case UpdateDoc: - // Updates to existing docs should always sync (no-sub required) - if !action.DontSync && action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { - targetToSourceEvents[waitIndex] += 1 - } - if !action.DontSync && action.NodeID.HasValue() && action.NodeID.Value() == cfg.SourceNodeID { - sourceToTargetEvents[waitIndex] += 1 - } - - case WaitForSync: - waitIndex += 1 - targetToSourceEvents = append(targetToSourceEvents, 0) - sourceToTargetEvents = append(sourceToTargetEvents, 0) - } - } - - nodeSynced := make(chan struct{}) - go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced) - s.syncChans = append(s.syncChans, nodeSynced) -} - -// collectionSubscribedTo returns true if the collection on the given node -// has been subscribed to. -func collectionSubscribedTo( - nodeCollections map[int][]int, - nodeID int, - collectionID int, -) bool { - targetSubscriptionCollections := nodeCollections[nodeID] - for _, collectionId := range targetSubscriptionCollections { - if collectionId == collectionID { - return true - } - } - return false } // configureReplicator configures a replicator relationship between two existing, started, nodes. @@ -282,26 +169,18 @@ func configureReplicator( s *state, cfg ConfigureReplicator, ) { - // If we have some database actions prior to configuring the replicator, we want to ensure that they had time to - // complete before the configuration. Otherwise we might wrongly catch them in our wait function. - time.Sleep(100 * time.Millisecond) sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] - sub, err := sourceNode.Events().Subscribe(event.ReplicatorCompletedName) - require.NoError(s.t, err) - err = sourceNode.SetReplicator(s.ctx, client.Replicator{ + err := sourceNode.SetReplicator(s.ctx, client.Replicator{ Info: targetNode.PeerInfo(), }) - if err == nil { - // wait for the replicator setup to complete - <-sub.Message() - } expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, cfg.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, cfg.ExpectedError, expectedErrorRaised) + if err == nil { - setupReplicatorWaitSync(s, 0, cfg) + waitForReplicatorConfigureEvent(s, cfg) } } @@ -312,74 +191,11 @@ func deleteReplicator( sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] - sub, err := sourceNode.Events().Subscribe(event.ReplicatorCompletedName) - require.NoError(s.t, err) - err = sourceNode.DeleteReplicator(s.ctx, client.Replicator{ + err := sourceNode.DeleteReplicator(s.ctx, client.Replicator{ Info: targetNode.PeerInfo(), }) - if err == nil { - // wait for the replicator setup to complete - <-sub.Message() - } require.NoError(s.t, err) -} - -func setupReplicatorWaitSync( - s *state, - startIndex int, - cfg ConfigureReplicator, -) { - sourceToTargetEvents := []int{0} - targetToSourceEvents := []int{0} - - docIDsSyncedToSource := map[int]struct{}{} - waitIndex := 0 - currentDocID := 0 - for i := startIndex; i < len(s.testCase.Actions); i++ { - switch action := s.testCase.Actions[i].(type) { - case CreateDoc: - if !action.NodeID.HasValue() || action.NodeID.Value() == cfg.SourceNodeID { - docIDsSyncedToSource[currentDocID] = struct{}{} - } - - // A document created on the source or one that is created on all nodes will be sent to the target even - // it already has it. It will create a `received push log` event on the target which we need to wait for. - if !action.NodeID.HasValue() || action.NodeID.Value() == cfg.SourceNodeID { - sourceToTargetEvents[waitIndex] += 1 - } - - currentDocID++ - - case DeleteDoc: - if _, shouldSyncFromTarget := docIDsSyncedToSource[action.DocID]; shouldSyncFromTarget && - action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { - targetToSourceEvents[waitIndex] += 1 - } - - if action.NodeID.HasValue() && action.NodeID.Value() == cfg.SourceNodeID { - sourceToTargetEvents[waitIndex] += 1 - } - - case UpdateDoc: - if _, shouldSyncFromTarget := docIDsSyncedToSource[action.DocID]; shouldSyncFromTarget && - action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { - targetToSourceEvents[waitIndex] += 1 - } - - if action.NodeID.HasValue() && action.NodeID.Value() == cfg.SourceNodeID { - sourceToTargetEvents[waitIndex] += 1 - } - - case WaitForSync: - waitIndex += 1 - targetToSourceEvents = append(targetToSourceEvents, 0) - sourceToTargetEvents = append(sourceToTargetEvents, 0) - } - } - - nodeSynced := make(chan struct{}) - go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced) - s.syncChans = append(s.syncChans, nodeSynced) + waitForReplicatorDeleteEvent(s, cfg) } // subscribeToCollection sets up a collection subscription on the given node/collection. @@ -402,13 +218,9 @@ func subscribeToCollection( schemaRoots = append(schemaRoots, col.SchemaRoot()) } - sub, err := n.Events().Subscribe(event.P2PTopicCompletedName) - require.NoError(s.t, err) - - err = n.AddP2PCollections(s.ctx, schemaRoots) + err := n.AddP2PCollections(s.ctx, schemaRoots) if err == nil { - // wait for the p2p collection setup to complete - <-sub.Message() + waitForSubscribeToCollectionEvent(s, action) } expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) @@ -440,13 +252,9 @@ func unsubscribeToCollection( schemaRoots = append(schemaRoots, col.SchemaRoot()) } - sub, err := n.Events().Subscribe(event.P2PTopicCompletedName) - require.NoError(s.t, err) - - err = n.RemoveP2PCollections(s.ctx, schemaRoots) + err := n.RemoveP2PCollections(s.ctx, schemaRoots) if err == nil { - // wait for the p2p collection setup to complete - <-sub.Message() + waitForUnsubscribeToCollectionEvent(s, action) } expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) @@ -479,83 +287,6 @@ func getAllP2PCollections( assert.Equal(s.t, expectedCollections, cols) } -// waitForSync waits for all given wait channels to receive an item signaling completion. -// -// Will fail the test if an event is not received within the expected time interval to prevent tests -// from running forever. -func waitForSync( - s *state, - action WaitForSync, -) { - var timeout time.Duration - if action.ExpectedTimeout != 0 { - timeout = action.ExpectedTimeout - } else { - timeout = subscriptionTimeout * 10 - } - - for _, resultsChan := range s.syncChans { - select { - case <-resultsChan: - assert.True( - s.t, - action.ExpectedTimeout == 0, - "unexpected document has been synced", - s.testCase.Description, - ) - - // a safety in case the stream hangs - we don't want the tests to run forever. - case <-time.After(timeout): - assert.True( - s.t, - action.ExpectedTimeout != 0, - "timeout occurred while waiting for data stream", - s.testCase.Description, - ) - } - } -} - -// waitForMerge waits for the source and target nodes to synchronize their state -// by listening to merge events sent from the network subsystem on the event bus. -// -// sourceToTargetEvents and targetToSourceEvents are slices containing the number -// of expected merge events to be received after each test action has executed. -func waitForMerge( - s *state, - sourceNodeID int, - targetNodeID int, - sourceToTargetEvents []int, - targetToSourceEvents []int, - nodeSynced chan struct{}, -) { - sourceSub := s.eventSubs[sourceNodeID] - targetSub := s.eventSubs[targetNodeID] - - sourcePeerInfo := s.nodeAddresses[sourceNodeID] - targetPeerInfo := s.nodeAddresses[targetNodeID] - - for waitIndex := 0; waitIndex < len(sourceToTargetEvents); waitIndex++ { - for i := 0; i < targetToSourceEvents[waitIndex]; i++ { - // wait for message or unsubscribe - msg, ok := <-sourceSub.Message() - if ok { - // ensure the message is sent from the target node - require.Equal(s.t, targetPeerInfo.ID, msg.Data.(event.Merge).ByPeer) - } - } - for i := 0; i < sourceToTargetEvents[waitIndex]; i++ { - // wait for message or unsubscribe - msg, ok := <-targetSub.Message() - if ok { - // ensure the message is sent from the source node - require.Equal(s.t, sourcePeerInfo.ID, msg.Data.(event.Merge).ByPeer) - } - } - nodeSynced <- struct{}{} - } -} - func RandomNetworkingConfig() ConfigureNode { return func() []net.NodeOpt { return []net.NodeOpt{ diff --git a/tests/integration/state.go b/tests/integration/state.go index fd9bd6b12c..613462d40f 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -14,6 +14,7 @@ import ( "context" "testing" + "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" identity "github.com/sourcenetwork/defradb/acp/identity" @@ -24,6 +25,86 @@ import ( "github.com/sourcenetwork/defradb/tests/clients" ) +// p2pState contains all p2p related testing state. +type p2pState struct { + // connections contains all connected nodes. + // + // The map key is the connected node id. + connections map[int]struct{} + + // replicators is a mapping of replicator targets. + // + // The map key is the source node id. + replicators map[int]struct{} + + // peerCollections contains all active peer collection subscriptions. + // + // The map key is the node id of the subscriber. + peerCollections map[int]struct{} + + // actualDocHeads contains all document heads that exist on a node. + // + // The map key is the doc id. The map value is the doc head. + actualDocHeads map[string]cid.Cid + + // expectedDocHeads contains all document heads that are expected to exist on a node. + // + // The map key is the doc id. The map value is the doc head. + expectedDocHeads map[string]cid.Cid +} + +// newP2PState returns a new empty p2p state. +func newP2PState() *p2pState { + return &p2pState{ + connections: make(map[int]struct{}), + replicators: make(map[int]struct{}), + peerCollections: make(map[int]struct{}), + actualDocHeads: make(map[string]cid.Cid), + expectedDocHeads: make(map[string]cid.Cid), + } +} + +// eventState contains all event related testing state for a node. +type eventState struct { + // merge is the `event.MergeCompleteName` subscription + merge *event.Subscription + + // update is the `event.UpdateName` subscription + update *event.Subscription + + // replicator is the `event.ReplicatorCompletedName` subscription + replicator *event.Subscription + + // p2pTopic is the `event.P2PTopicCompletedName` subscription + p2pTopic *event.Subscription +} + +// newEventState returns an eventState with all required subscriptions. +func newEventState(bus *event.Bus) (*eventState, error) { + merge, err := bus.Subscribe(event.MergeCompleteName) + if err != nil { + return nil, err + } + update, err := bus.Subscribe(event.UpdateName) + if err != nil { + return nil, err + } + replicator, err := bus.Subscribe(event.ReplicatorCompletedName) + if err != nil { + return nil, err + } + p2pTopic, err := bus.Subscribe(event.P2PTopicCompletedName) + if err != nil { + return nil, err + } + return &eventState{ + merge: merge, + update: update, + replicator: replicator, + p2pTopic: p2pTopic, + }, nil +} + type state struct { // The test context. ctx context.Context @@ -54,11 +135,8 @@ type state struct { // These channels will recieve a function which asserts results of any subscription requests. subscriptionResultsChans []chan func() - // These synchronisation channels allow async actions to track their completion. - syncChans []chan struct{} - - // eventSubs is a list of all event subscriptions - eventSubs []*event.Subscription + // nodeEvents contains all event node subscriptions. + nodeEvents []*eventState // The addresses of any nodes configured. nodeAddresses []peer.AddrInfo @@ -69,6 +147,9 @@ type state struct { // The nodes active in this test. nodes []clients.Client + // nodeP2P contains p2p states for all nodes + nodeP2P []*p2pState + // The paths to any file-based databases active in this test. dbPaths []string @@ -114,10 +195,10 @@ func newState( txns: []datastore.Txn{}, allActionsDone: make(chan struct{}), subscriptionResultsChans: []chan func(){}, - syncChans: []chan struct{}{}, - eventSubs: []*event.Subscription{}, + nodeEvents: []*eventState{}, nodeAddresses: []peer.AddrInfo{}, nodeConfigs: [][]net.NodeOpt{}, + nodeP2P: []*p2pState{}, nodes: []clients.Client{}, dbPaths: []string{}, collections: [][]client.Collection{}, diff --git a/tests/integration/test_case.go b/tests/integration/test_case.go index 0013ceab0c..1ebf396253 100644 --- a/tests/integration/test_case.go +++ b/tests/integration/test_case.go @@ -318,9 +318,6 @@ type DeleteDoc struct { // String can be a partial, and the test will pass if an error is returned that // contains this string. ExpectedError string - - // Setting DontSync to true will prevent waiting for that delete. - DontSync bool } // UpdateDoc will attempt to update the given document using the set [MutationType]. @@ -356,8 +353,11 @@ type UpdateDoc struct { // contains this string. ExpectedError string - // Setting DontSync to true will prevent waiting for that update. - DontSync bool + // Skip waiting for an update event on the local event bus. + // + // This should only be used for tests that do not correctly + // publish an update event to the local event bus. + SkipLocalUpdateEvent bool } // IndexField describes a field to be indexed. diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index ad6ef2a85d..67ed5dd3d1 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -33,7 +33,6 @@ import ( "github.com/sourcenetwork/defradb/crypto" "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/errors" - "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/internal/db" "github.com/sourcenetwork/defradb/internal/encryption" "github.com/sourcenetwork/defradb/internal/request/graphql" @@ -253,7 +252,7 @@ func performAction( configureNode(s, action) case Restart: - restartNodes(s, actionIndex) + restartNodes(s) case ConnectPeers: connectPeers(s, action) @@ -343,7 +342,7 @@ func performAction( assertClientIntrospectionResults(s, action) case WaitForSync: - waitForSync(s, action) + waitForMergeEvents(s) case Benchmark: benchmarkAction(s, actionIndex, action) @@ -658,14 +657,18 @@ func setStartingNodes( c, err := setupClient(s, node) require.Nil(s.t, err) + eventState, err := newEventState(c.Events()) + require.NoError(s.t, err) + s.nodes = append(s.nodes, c) + s.nodeEvents = append(s.nodeEvents, eventState) + s.nodeP2P = append(s.nodeP2P, newP2PState()) s.dbPaths = append(s.dbPaths, path) } } func restartNodes( s *state, - actionIndex int, ) { if s.dbt == badgerIMType || s.dbt == defraIMType { return @@ -686,24 +689,12 @@ func restartNodes( c, err := setupClient(s, node) require.NoError(s.t, err) s.nodes[i] = c - continue - } - // We need to ensure that on restart, the node pubsub is configured before - // we continue with the test. Otherwise, we may miss update events. - readySub, err := node.DB.Events().Subscribe(event.P2PTopicCompletedName, event.ReplicatorCompletedName) - require.NoError(s.t, err) - waitLen := 0 - cols, err := node.DB.GetAllP2PCollections(s.ctx) - require.NoError(s.t, err) - if len(cols) > 0 { - // there is only one message for loading of P2P collections - waitLen++ + eventState, err := newEventState(c.Events()) + require.NoError(s.t, err) + s.nodeEvents[i] = eventState + continue } - reps, err := node.DB.GetAllReplicators(s.ctx) - require.NoError(s.t, err) - // there is one message per replicator - waitLen += len(reps) // We need to make sure the node is configured with its old address, otherwise // a new one may be selected and reconnnection to it will fail. @@ -728,45 +719,11 @@ func restartNodes( require.NoError(s.t, err) s.nodes[i] = c - // subscribe to merge complete events - sub, err := c.Events().Subscribe(event.MergeCompleteName) + eventState, err := newEventState(c.Events()) require.NoError(s.t, err) - s.eventSubs[i] = sub - for waitLen > 0 { - select { - case <-readySub.Message(): - waitLen-- - case <-time.After(10 * time.Second): - s.t.Fatalf("timeout waiting for node to be ready") - } - } - } - - // The index of the action after the last wait action before the current restart action. - // We wish to resume the wait clock from this point onwards. - waitGroupStartIndex := 0 -actionLoop: - for i := actionIndex; i >= 0; i-- { - switch s.testCase.Actions[i].(type) { - case WaitForSync: - // +1 as we do not wish to resume from the wait itself, but the next action - // following it. This may be the current restart action. - waitGroupStartIndex = i + 1 - break actionLoop - } - } + s.nodeEvents[i] = eventState - for _, tc := range s.testCase.Actions { - switch action := tc.(type) { - case ConnectPeers: - // Give the nodes a chance to connect to each other and learn about each other's subscribed topics. - time.Sleep(100 * time.Millisecond) - setupPeerWaitSync(s, waitGroupStartIndex, action) - case ConfigureReplicator: - // Give the nodes a chance to connect to each other and learn about each other's subscribed topics. - time.Sleep(100 * time.Millisecond) - setupReplicatorWaitSync(s, waitGroupStartIndex, action) - } + waitForNetworkSetupEvents(s, i) } // If the db was restarted we need to refresh the collection definitions as the old instances @@ -840,13 +797,13 @@ func configureNode( c, err := setupClient(s, node) require.NoError(s.t, err) + eventState, err := newEventState(c.Events()) + require.NoError(s.t, err) + s.nodes = append(s.nodes, c) + s.nodeEvents = append(s.nodeEvents, eventState) + s.nodeP2P = append(s.nodeP2P, newP2PState()) s.dbPaths = append(s.dbPaths, path) - - // subscribe to merge complete events - sub, err := c.Events().Subscribe(event.MergeCompleteName) - require.NoError(s.t, err) - s.eventSubs = append(s.eventSubs, sub) } func refreshDocuments( @@ -1224,6 +1181,12 @@ func createDoc( s.documents = append(s.documents, make([][]*client.Document, action.CollectionID-len(s.documents)+1)...) } s.documents[action.CollectionID] = append(s.documents[action.CollectionID], docs...) + + if action.ExpectedError == "" { + for i := 0; i < len(docs); i++ { + waitForUpdateEvents(s, action.NodeID, action.CollectionID) + } + } } func createDocViaColSave( @@ -1430,6 +1393,10 @@ func deleteDoc( } assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) + + if action.ExpectedError == "" { + waitForUpdateEvents(s, action.NodeID, action.CollectionID) + } } // updateDoc updates a document using the chosen [mutationType]. @@ -1462,6 +1429,10 @@ func updateDoc( } assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) + + if action.ExpectedError == "" && !action.SkipLocalUpdateEvent { + waitForUpdateEvents(s, action.NodeID, action.CollectionID) + } } func updateDocViaColSave(