-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Added Peer-to-Peer networking data synchronization #177
Conversation
… libp2p protocol definition
… node Options system for configuring node behavior
…g type in their publish functions. Only MerkleCompositeDAG publish their updates to the pubsub net
…d protobuf definitions. Updated core/net/Broadcaster type Implemeneted new Broadcaster type on net/peer Breaking: Updated MerkleClock interface to have AddDAGNode return the newly created node. Updated protobuf definitions for net/pb
… peer boostrapping, updated HTTP API to return errors, fixed pubsub push mechanics
… publisher now properly handling broadcast updates, updated collection create method to handle pubsub registering
…chemaID to CompositeDAGDelta objects. Update Merkle factory to include additional schema id parameter
Replicator allows a node to target another node to directly sync changes with. This is in comparison to the general pubsub network. Replicator is 'active' sync, while pubsub is 'passive' sync. Additionally, added a second tcp gRPC server for clients to interact with, and added a gRPC client utility for making RPC requests.
var bs *broadcast.Broadcaster | ||
if !config.Net.P2PDisabled { | ||
bs := broadcast.NewBroadcaster(busBufferSize) | ||
options = append(options, db.WithBroadcaster(bs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) func. options pattern makes more sense here, is nice
if len(peers) != 0 { | ||
log.Debug("Parsing boostrap peers: ", peers) | ||
addrs, err := netutils.ParsePeers(strings.Split(peers, ",")) | ||
if !config.Net.P2PDisabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cheers :)
// 2 is update log | ||
var err error | ||
if msg.Priority == 1 { | ||
err = p.handleDocCreateLog(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice cheers - this makes it quite a bit easier to tell what's what IMO, and it hosts the code in the same place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good :D Looking forward to having it in and playing around with it (and figuring how to int. test it lol)
net/api/client/client.go
Outdated
|
||
// AddReplicator sends a request to add a target replicator to the DB peer | ||
func (c *Client) AddReplicator(ctx context.Context, collection string, paddr ma.Multiaddr) (peer.ID, error) { | ||
var pid peer.ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, butthis var is only used when default (on err) - suggest declaring it only in those scopes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
||
// make sure were not duplicating things | ||
p.mu.Lock() | ||
defer p.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Minor as it looks like this lock is not really performance sensitive, but this also impacts maintenance) It is very unclear as to how long this lock actually needs to be held - it it simple to unlock earlier? I cant tell if it is only required for the next block or so, or the whole func.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be moved down to line 271 after p.replicators[col.SchemaID()][pid] = struct{}{}
but because there are exit cases between the Lock and that line, id rather just do it on defer. And its not like we're going to be spamming the replicator command. I find defer on a lock the most safe as it gurantees you dont accidentally deadlock yourself.
cli/defradb/cmd/start.go
Outdated
} | ||
|
||
server := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{ | ||
MaxConnectionIdle: 5 * time.Minute, // <--- This fixes it! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, but suggest comment removal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
go func() { | ||
log.Info("Started gRPC server, listening on %s", addr) | ||
netpb.RegisterServiceServer(server, netService) | ||
if err := server.Serve(tcplistener); err != nil && !errors.Is(err, grpc.ErrServerStopped) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick sanity check, it is safe to not copy the tcpListener here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea thats fine, go will handle the copy behind the scenes, its really only an issue in loops if youre overwriting a variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah nice, thanks :)
net/peer.go
Outdated
ds: ds, | ||
bus: bs, | ||
p2pRPC: grpc.NewServer(serverOptions...), | ||
// tcpRPC: grpc.NewServer(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest removal
net/peer.go
Outdated
if err != nil { | ||
return err | ||
} | ||
// addr, err := utils.TCPAddrFromMultiAddr(p.tcpAddr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest removal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
net/peer.go
Outdated
log.Fatal("Fatal tcp rpc serve error:", err) | ||
} | ||
}() | ||
// go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest removal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
This adds the first iteration of the peer-to-peer (p2p) networking stack powered by LibP2P. Additionally this starts the first iteration of the Peer API which is a gRPC API exposed over a pubsub network. * initial peer layout * outline protobuf net package * Added basic libp2p protobuf custom types * scaffold peer/server types * Added libp2p/gostream rpc dialers * initial implementation of the p2p/net server object * implemented NewPeer with grpc serve/listen over libp2p * Added Broadcaster definition and linked into DB, MerkleCRDT, and CRDTFactory * Updated db implementations to match update core/client interfaces. Added some utility to base descriptions * BREAKING: Changed how collections are stored internally, added namespace to collection key * Implemented new client interface for AllCollections and AllDocKeys * Re-architect MerkleDAG Broadcast and DB/Peer relationship. * Add Replicator system for p2p node. * Added rpc client to CLI and hooked up AddReplicator command
…rk#177) This adds the first iteration of the peer-to-peer (p2p) networking stack powered by LibP2P. Additionally this starts the first iteration of the Peer API which is a gRPC API exposed over a pubsub network. * initial peer layout * outline protobuf net package * Added basic libp2p protobuf custom types * scaffold peer/server types * Added libp2p/gostream rpc dialers * initial implementation of the p2p/net server object * implemented NewPeer with grpc serve/listen over libp2p * Added Broadcaster definition and linked into DB, MerkleCRDT, and CRDTFactory * Updated db implementations to match update core/client interfaces. Added some utility to base descriptions * BREAKING: Changed how collections are stored internally, added namespace to collection key * Implemented new client interface for AllCollections and AllDocKeys * Re-architect MerkleDAG Broadcast and DB/Peer relationship. * Add Replicator system for p2p node. * Added rpc client to CLI and hooked up AddReplicator command
This adds the first iteration of the peer-to-peer (p2p) networking stack powered by LibP2P. Additionally this starts the first iteration of the
Peer API
which is a gRPC API exposed over a pubsub network. Closes #69Architecture
This creates 3 new main structures
net/server
,net/peer
, andnode/node
.net/server
is the gRPC server which publishes and subcribes to RPC messages on the p2p gossibsub network.net/peer
encapsulates thenet/server
struct and handles DAG resolution and maintains an internal broadcast channel to synchronize the rest of the DB system.node/node
combines both thenet/peer
anddb/db
object into a unified DefraDB Node.ipfslite
host as a "DAGSyncer" as a DAG network getter/setter (currently powered by the LibP2P DHT and Bitswap protocol).Currently needs more formal automated testing, which is notoriously difficult to test this kind of networking system. More to come.
At the moment this only syncs documents after nodes have both subscribed to the same pubsub topic. The
Replicator
RPC command allows nodes to pre-emptively select which nodes to push/sync documents to.Documents are synced as their respective MerkleCRDT update graphs and
ipld.Nodes
.Todo:
Replicator
options to automatically sync new documents to a given peer