Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added Peer-to-Peer networking data synchronization #177

Merged
merged 59 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
4216430
peer stuff
jsimnz Jul 9, 2021
241990b
moved net package (forgot new files xD)
jsimnz Jul 21, 2021
c3946ee
outline protobuf net package
jsimnz Aug 4, 2021
1603294
Added basic libp2p protobuf custom types
jsimnz Aug 5, 2021
466a718
Added doc to net package with high high high level description. Added…
jsimnz Aug 10, 2021
e344f68
scaffold peer/server types
jsimnz Aug 17, 2021
b8089cc
more outline/scaffolding. Need a design doc
jsimnz Aug 17, 2021
09ffdf2
renamed proto service to match server object
jsimnz Sep 16, 2021
099665b
Expanded protobuf definitions, added maked file for protobuf
jsimnz Dec 2, 2021
fa52365
Updated net libp2p protocol definition
jsimnz Dec 6, 2021
62457c0
Updated server/peer objects
jsimnz Dec 6, 2021
f0aaf97
Added libp2p/gostream rpc dialers
jsimnz Dec 14, 2021
7ae19f8
Updated p2p protocolbuffer definitions
jsimnz Dec 20, 2021
d6fc5db
rewrote libp2p/peer import to prevent name clashing
jsimnz Dec 20, 2021
ee09691
initial implementation of the p2p/net server object
jsimnz Jan 4, 2022
266faf0
implemented NewPeer with grpc serve/listen over libp2p
jsimnz Jan 5, 2022
14e6dce
Fixed issues from rebase, lots of context stuff
jsimnz Jan 15, 2022
2fe8830
Added Broadcaster definition and linked into DB, MerkleCRDT, and CRDT…
jsimnz Jan 15, 2022
9920bd6
Wired in Broadcaster object into the DB and CRDT Factory. Created the…
jsimnz Jan 16, 2022
03473aa
Added core.Log type, updated MerkleCRDT implementations to use new lo…
jsimnz Jan 16, 2022
e43d34d
Continued work on P2P sync on Broadcaster, DAGService, Blockstore, an…
jsimnz Jan 16, 2022
4bdaa71
Wired in new doc registers on peer, added peer reference to DB, added…
jsimnz Jan 18, 2022
faa5f46
Updated CompositeDAG to include broadcaster on init, base merkle crdt…
jsimnz Jan 18, 2022
226e1ae
Fixed custom protobuffer DocKey marshalling
jsimnz Jan 18, 2022
f64bdf7
Added cidV1 util. Implemented Collection index via schema ID. Added S…
jsimnz Jan 18, 2022
c24cd25
Updated core interfaces for the p2p stuff
jsimnz Jan 19, 2022
5aed6da
Updated db implementations to match update core/client interfaces. Ad…
jsimnz Jan 19, 2022
f226499
mod dependancy changes during testing to local implementation. NEED T…
jsimnz Jan 19, 2022
f3a2175
extended overhaul of merkle crdt/clock system to accomodate new p2p s…
jsimnz Jan 19, 2022
dc3ad1b
updated comment
jsimnz Feb 2, 2022
2d5aed7
BREAKING: Changed how collections are stored internally, added namesp…
jsimnz Feb 2, 2022
296bfc9
Updated Db/Collection client interface
jsimnz Feb 2, 2022
dd799e7
Implemented new client interface for AllCollections and AllDocKeys
jsimnz Feb 2, 2022
10ccb63
Refactored ListAllKeys system to be more concurrent safe
jsimnz Feb 2, 2022
6b59ddc
Added server side DB dumping (badger only), and added a new example g…
jsimnz Feb 2, 2022
64294dd
Resolved outstanding compile issues from rebase
jsimnz Feb 3, 2022
b79eb66
Fixed all tests from cherry/rebase, now passing :)
jsimnz Feb 3, 2022
75b1502
Updated log/print from rebase
jsimnz Feb 3, 2022
2adbbed
Fixed remaining rebase conflicts and compile issues
jsimnz Feb 4, 2022
3f4fd48
Removed local dev dependencies
jsimnz Feb 4, 2022
b3366e5
Implementing some review comments/remove dead/commented code
jsimnz Feb 5, 2022
081bf84
Minor changes from review comments
jsimnz Feb 5, 2022
f10286b
Final review comments, removed more dead/comment code, fixed dangerou…
jsimnz Feb 5, 2022
42302f9
forgot a few review comments, minor stuff
jsimnz Feb 5, 2022
47e900f
Forgot to update new DataPath option func in CLI code
jsimnz Feb 5, 2022
fb21e3f
PROPOSAL: Re-architect MerkleDAG Broadcast and DB/Peer relationship.
jsimnz Feb 6, 2022
18c7b3c
Fixed compilation + tests after rearchitect
jsimnz Feb 6, 2022
aab347e
Fixed accidental re-instance of variable
jsimnz Feb 6, 2022
43c30d3
Changed no-p2p flag name
jsimnz Feb 6, 2022
7e99b30
Removed old Publish func arg
jsimnz Feb 6, 2022
99a4a21
Fixed job reference wrong field(s)
jsimnz Feb 6, 2022
ac61987
Fixed linter issues
jsimnz Feb 6, 2022
aa6c1bc
Renamed net/server mutex
jsimnz Feb 7, 2022
22b1f62
Add Replicator system for p2p node.
jsimnz Feb 7, 2022
7aa53bf
Added rpc client to CLI and hooked up AddReplicator command
jsimnz Feb 7, 2022
594d362
Fixed replicator RPC issue, moved RPC server from /net/peer to /net/api
jsimnz Feb 7, 2022
0a3aedc
Removed old debug statements
jsimnz Feb 7, 2022
0bbae9a
Fixed linter issues
jsimnz Feb 7, 2022
357fb23
Addressed review comments
jsimnz Feb 7, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ func NewServer(db client.DB) *Server {
return s
}

func (s *Server) Listen(addr string) {
func (s *Server) Listen(addr string) error {
if err := http.ListenAndServe(addr, s.router); err != nil {
log.Fatalln("Error: HTTP Listening and Serving Failed: ", err)
return err
}
return nil
}

func (s *Server) ping(w http.ResponseWriter, r *http.Request) {
Expand Down
9 changes: 9 additions & 0 deletions cli/defradb/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type Config struct {
Database Options
Net NetOptions
}

type Options struct {
Expand All @@ -35,6 +36,11 @@ type MemoryOptions struct {
Size uint64
}

type NetOptions struct {
P2PAddress string
P2PDisabled bool
}

var (
defaultConfig = Config{
Database: Options{
Expand All @@ -44,5 +50,8 @@ var (
Path: "$HOME/.defradb/data",
},
},
Net: NetOptions{
P2PAddress: "/ip4/0.0.0.0/tcp/9171",
},
}
)
48 changes: 41 additions & 7 deletions cli/defradb/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package cmd

import (
"bytes"
"fmt"
"os"
"strings"

Expand Down Expand Up @@ -63,7 +64,7 @@ func init() {
// Here you will define your flags and configuration settings.
// Cobra supports persistent flags, which, if defined here,
// will be global for your application.
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.defradb.yaml)")
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.defradb/config.yaml)")
rootCmd.PersistentFlags().StringVar(&dbURL, "url", "http://localhost:9181", "url of the target database")
// Cobra also supports local flags, which will only run
// when this action is called directly.
Expand All @@ -74,11 +75,32 @@ func init() {
}

func initLogger() {
lvl, err := logging.LevelFromString(logLvl)
if err != nil {
panic(err)
lvls := strings.Split(logLvl, ",")
if len(lvls) == 1 {
lvl, err := logging.LevelFromString(logLvl)
if err != nil {
panic(err)
}
logging.SetAllLoggers(lvl)
} else {
lvl, err := logging.LevelFromString(lvls[0])
if err != nil {
panic(err)
}
logging.SetAllLoggers(lvl)

for _, l := range lvls[1:] {
lvl := strings.Split(l, "=")
if len(lvl) != 2 {
fmt.Printf("Invalid format for log level: %s\n", l)
os.Exit(1)
}
if err := logging.SetLogLevel(lvl[0], lvl[1]); err != nil {
fmt.Printf("Failed to set log level: %s\n", err)
os.Exit(1)
}
}
}
logging.SetAllLoggers(lvl)
}

// initConfig reads in config file and ENV variables if set.
Expand Down Expand Up @@ -106,8 +128,11 @@ func initConfig() {
// fmt.Fprintln(os.Stderr, "Using config file:", viper.ConfigFileUsed())
log.Debug("Loading config file:", viper.ConfigFileUsed())
} else {
if err := os.Mkdir(home+"/.defradb", os.ModePerm); err != nil {
cobra.CheckErr(err)
dir := home + "/.defradb"
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err := os.Mkdir(dir, os.ModePerm); err != nil {
cobra.CheckErr(err)
}
}
// if err != nil {
// cobra.CheckErr(err)
Expand All @@ -130,6 +155,15 @@ func initConfig() {
err = viper.BindPFlag("database.store", startCmd.Flags().Lookup("store"))
cobra.CheckErr(err)

err = viper.BindPFlag("database.badger.path", startCmd.Flags().Lookup("data"))
cobra.CheckErr(err)

err = viper.BindPFlag("net.p2paddress", startCmd.Flags().Lookup("p2paddr"))
cobra.CheckErr(err)

err = viper.BindPFlag("net.p2pdisabled", startCmd.Flags().Lookup("no-p2p"))
cobra.CheckErr(err)

err = viper.Unmarshal(&config)
cobra.CheckErr(err)
}
1 change: 1 addition & 0 deletions cli/defradb/cmd/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package cmd
AndrewSisley marked this conversation as resolved.
Show resolved Hide resolved
69 changes: 69 additions & 0 deletions cli/defradb/cmd/serverdump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020 Source Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package cmd

import (
"context"
"os"
"os/signal"

ds "github.com/ipfs/go-datastore"
badgerds "github.com/sourcenetwork/defradb/datastores/badger/v3"
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/db"
)

// dumpCmd represents the dump command
var srvDumpCmd = &cobra.Command{
Use: "server-dump",
Short: "Dumps the state of the entire database (server side)",
Run: func(cmd *cobra.Command, args []string) {
log.Info("Starting DefraDB process...")
ctx := context.Background()

// setup signal handlers
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)

var rootstore ds.Batching
var err error
if config.Database.Store == "badger" {
log.Info("opening badger store: ", config.Database.Badger.Path)
rootstore, err = badgerds.NewDatastore(config.Database.Badger.Path, config.Database.Badger.Options)
} else {
log.Error("Server side dump is only supported for the Badger datastore")
os.Exit(1)
}
if err != nil {
log.Error("Failed to initiate datastore:", err)
os.Exit(1)
}

db, err := db.NewDB(rootstore)
if err != nil {
log.Error("Failed to initiate database:", err)
os.Exit(1)
}
if err := db.Start(ctx); err != nil {
log.Error("Failed to start the database: ", err)
db.Close()
os.Exit(1)
}

log.Info("Dumping DB state:")
db.PrintDump(ctx)
},
}

func init() {
rootCmd.AddCommand(srvDumpCmd)
srvDumpCmd.Flags().String("store", "badger", "Specify the data store to use (supported: badger, memory)")
}
85 changes: 77 additions & 8 deletions cli/defradb/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,25 @@ import (
"context"
"os"
"os/signal"
"strings"

badgerds "github.com/sourcenetwork/defradb/datastores/badger/v3"
"github.com/sourcenetwork/defradb/db"
netutils "github.com/sourcenetwork/defradb/net/utils"
"github.com/sourcenetwork/defradb/node"

badger "github.com/dgraph-io/badger/v3"
ds "github.com/ipfs/go-datastore"
badgerds "github.com/sourcenetwork/defradb/datastores/badger/v3"
"github.com/spf13/cobra"
"github.com/textileio/go-threads/broadcast"
)

var (
p2pAddr string
dataPath string
peers string

busBufferSize = 100
)

// startCmd represents the start command
Expand All @@ -36,25 +48,32 @@ var startCmd = &cobra.Command{
signal.Notify(signalCh, os.Interrupt)

var rootstore ds.Batching
var options interface{}

var err error
if config.Database.Store == "badger" {
log.Info("opening badger store: ", config.Database.Badger.Path)
rootstore, err = badgerds.NewDatastore(config.Database.Badger.Path, config.Database.Badger.Options)
options = config.Database.Badger
} else if config.Database.Store == "memory" {
log.Info("building new memory store")
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
rootstore, err = badgerds.NewDatastore("", &opts)
options = config.Database.Memory
}

if err != nil {
log.Error("Failed to initiate datastore:", err)
os.Exit(1)
}

db, err := db.NewDB(rootstore, options)
var options []db.Option

// check for p2p
var bs *broadcast.Broadcaster
if !config.Net.P2PDisabled {
bs = broadcast.NewBroadcaster(busBufferSize)
options = append(options, db.WithBroadcaster(bs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) func. options pattern makes more sense here, is nice

}

db, err := db.NewDB(rootstore, options...)
if err != nil {
log.Error("Failed to initiate database:", err)
os.Exit(1)
Expand All @@ -65,13 +84,60 @@ var startCmd = &cobra.Command{
os.Exit(1)
}

// run the server listener in a separate goroutine
// init the p2p node
var n *node.Node
if !config.Net.P2PDisabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cheers :)

n, err = node.NewNode(
ctx,
db,
bs,
node.DataPath(config.Database.Badger.Path),
node.ListenAddrStrings(config.Net.P2PAddress),
node.WithPubSub(true))
if err != nil {
log.Error("Failed to start p2p node:", err)
n.Close() //nolint
db.Close()
os.Exit(1)
}

// parse peers and bootstrap
if len(peers) != 0 {
log.Debug("Parsing boostrap peers: ", peers)
addrs, err := netutils.ParsePeers(strings.Split(peers, ","))
if err != nil {
log.Warn("Failed to parse boostrap peers: ", err)
}
log.Debug("Bootstraping with peers: ", addrs)
n.Boostrap(addrs)
}

if err := n.Start(); err != nil {
AndrewSisley marked this conversation as resolved.
Show resolved Hide resolved
log.Error("Failed to start p2p listener:", err)
n.Close() //nolint
db.Close()
os.Exit(1)
}
}

// run the server listener in a seperate goroutine
go func() {
db.Listen(config.Database.Address)
if err := db.Listen(config.Database.Address); err != nil {
log.Error("Failed to start API listener:", err)
if n != nil {
n.Close() //nolint
}
db.Close()
os.Exit(1)
}
}()

// wait for shutdown signal
<-signalCh
log.Info("Recieved interrupt; closing db")
if n != nil {
n.Close() //nolint
}
db.Close()
os.Exit(0)
},
Expand All @@ -89,5 +155,8 @@ func init() {
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
startCmd.Flags().String("store", "badger", "Specify the data store to use (supported: badger, memory)")

startCmd.Flags().StringVar(&peers, "peers", "", "list of peers to connect to")
startCmd.Flags().StringVar(&p2pAddr, "p2paddr", "/ip4/0.0.0.0/tcp/9171", "listener address for the p2p network (formatted as a libp2p MultiAddr)")
startCmd.Flags().StringVar(&dataPath, "data", "$HOME/.defradb/data", "Data path to save DB data and other related meta-data")
startCmd.Flags().Bool("no-p2p", false, "Turn off the peer-to-peer network synchroniation system")
}
6 changes: 6 additions & 0 deletions cli/defradb/examples/address.graphql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type address {
street: String
number: Int
city: String
country: String
AndrewSisley marked this conversation as resolved.
Show resolved Hide resolved
}
19 changes: 19 additions & 0 deletions client/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,27 @@ import (

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
)

type DB interface {
// Collections
CreateCollection(context.Context, base.CollectionDescription) (Collection, error)
GetCollection(context.Context, string) (Collection, error)
GetCollectionBySchemaID(context.Context, string) (Collection, error)
ExecQuery(context.Context, string) *QueryResult
SchemaManager() *schema.SchemaManager
AddSchema(context.Context, string) error
PrintDump(ctx context.Context)
GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error)
Root() ds.Batching
// Rootstore() core.DSReaderWriter
// Headstore() core.DSReaderWriter
// Datastore() core.DSReaderWriter
DAGstore() core.DAGStore
AndrewSisley marked this conversation as resolved.
Show resolved Hide resolved

NewTxn(context.Context, bool) (Txn, error)
GetAllCollections(ctx context.Context) ([]Collection, error)
}

type Sequence interface{}
Expand All @@ -50,6 +60,7 @@ type Collection interface {
Name() string
Schema() base.SchemaDescription
ID() uint32
SchemaID() string

Indexes() []base.IndexDescription
PrimaryIndex() base.IndexDescription
Expand All @@ -76,6 +87,14 @@ type Collection interface {
Get(context.Context, key.DocKey) (*document.Document, error)

WithTxn(Txn) Collection

GetPrimaryIndexDocKey(ds.Key) ds.Key
GetAllDocKeys(ctx context.Context) (<-chan DocKeysResult, error)
}

type DocKeysResult struct {
Key key.DocKey
Err error
}

type UpdateOpt struct{}
Expand Down
Loading