Skip to content

Commit

Permalink
feat(p2p.AnnounceDatasetChanges): new message for announcing datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Mar 19, 2018
1 parent 7a4e292 commit 29016e6
Show file tree
Hide file tree
Showing 13 changed files with 381 additions and 40 deletions.
7 changes: 7 additions & 0 deletions p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type NodeCfg struct {
// Online is a flag for weather this node should connect
// to the distributed network
Online bool

// SelfReplication determines what to do when this peer sees messages
// broadcast by it's own profile (from another peer instance). setting
// SelfReplication == "full" will cause this peer to automatically pin
// any data that is verifyably posted by the same peer
SelfReplication string
}

// DefaultNodeCfg generates sensible settings for a Qri Node
Expand All @@ -59,6 +65,7 @@ func DefaultNodeCfg() *NodeCfg {
PubKey: pub,
QriBootstrapAddrs: DefaultBootstrapAddresses,
Secure: true,
SelfReplication: "full",
}
}

Expand Down
120 changes: 120 additions & 0 deletions p2p/dataset_changes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package p2p

import (
"encoding/json"
"strings"
// "time"

"github.com/ipfs/go-datastore"
"github.com/qri-io/cafs"
ipfs "github.com/qri-io/cafs/ipfs"
"github.com/qri-io/dataset/dsfs"
"github.com/qri-io/qri/repo"
)

// MtDatasetChanges is a message to announce added / removed datasets to the network
const MtDatasetChanges = MsgType("dataset_changes")

// DatasetChanges describes created & deleted datasets with slices of
// repo DatsetRef strings.
// Because dataset data is immutable, All changes should be describable
// as creations and deletions.
// Dataset names, however, *are* mutable. Renaming is conveyed by listing
// the former ref as deleted & new ref as created.
type DatasetChanges struct {
Created []string
Deleted []string
}

// AnnounceDatasetChanges transmits info of dataset changes to
func (n *QriNode) AnnounceDatasetChanges(changes DatasetChanges) error {
log.Debugf("%s: AnnounceDatasetChanges", n.ID)

msg, err := NewJSONBodyMessage(n.ID, MtDatasetChanges, changes)
if err != nil {
return err
}

// grab 50 peers & fire off our announcement to them
return n.SendMessage(msg, nil, n.ClosestConnectedPeers("", 50)...)
}

func (n *QriNode) handleDatasetChanges(ws *WrappedStream, msg Message) (hangup bool) {
hangup = true

// only handle messages once, and if they're not too old
// if _, ok := n.msgState.Load(msg.ID); ok || time.Now().After(msg.Deadline) {
// return
// }

n.msgState.Store(msg.ID, msg.Deadline)

pro, err := n.Repo.Profile()
if err != nil {
log.Debug(err.Error())
return
}

changes := DatasetChanges{}
if err := json.Unmarshal(msg.Body, &changes); err != nil {
log.Debug(err.Error())
return
}

for _, add := range changes.Created {
if ref, err := repo.ParseDatasetRef(add); err == nil {
if err := n.Repo.RefCache().PutRef(ref); err != nil {
log.Debug(err.Error())
}

if ref.PeerID == pro.ID && n.selfReplication == "full" {
log.Infof("%s %s self replicating dataset %s from %s", n.ID.Pretty(), pro.ID, ref.String(), msg.Initiator.Pretty())

if err = n.Repo.PutRef(ref); err != nil {
log.Debug(err.Error())
return
}

go func() {
// TODO - this is pulled out of core.DatasetRequests.Add
// we should find a common place for this code & have both add & this func call it
// possibly in / subpackage of repo?
fs, ok := n.Repo.Store().(*ipfs.Filestore)
if !ok {
log.Debug("can only add datasets when running an IPFS filestore")
return
}

key := datastore.NewKey(strings.TrimSuffix(ref.Path, "/"+dsfs.PackageFileDataset.String()))

_, e := fs.Fetch(cafs.SourceAny, key)
if e != nil {
log.Debugf("error fetching file: %s", e.Error())
return
}

e = fs.Pin(key, true)
if e != nil {
log.Debug(e.Error())
return
}
}()
}

} else {
log.Debug(err.Error())
}
}

for _, remove := range changes.Deleted {
if ref, err := repo.ParseDatasetRef(remove); err == nil {
if err := n.Repo.RefCache().DeleteRef(ref); err != nil {
log.Debug(err.Error())
}
} else {
log.Debug(err.Error())
}
}

return
}
180 changes: 180 additions & 0 deletions p2p/dataset_changes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package p2p

import (
"context"
"fmt"
"github.com/qri-io/qri/repo/profile"
"sync"
"testing"

"github.com/qri-io/qri/repo"
)

func TestAnnounceDatasetChanges(t *testing.T) {
ctx := context.Background()
peers, err := NewTestNetwork(ctx, t, 5)
if err != nil {
t.Errorf("error creating network: %s", err.Error())
return
}

if err := connectNodes(ctx, peers); err != nil {
t.Errorf("error connecting peers: %s", err.Error())
}

t.Logf("testing AnnounceDatasetChanges message with %d peers", len(peers))
var wg sync.WaitGroup
for i, p := range peers {
wg.Add(1)

r := make(chan Message)
p.ReceiveMessages(r)

go func(p *QriNode) {
for {
msg := <-r

if msg.Type != MtDatasetChanges {
t.Error("expected only dataset_changes messages")
}

count, err := p.Repo.RefCache().RefCount()
if err != nil {
t.Errorf("%s, error getting RefCount: %s", p.ID, err.Error())
}

if count == len(peers)-1 {
wg.Done()
return
}
}
}(p)

go func(i int, p *QriNode) {
if err := p.AnnounceDatasetChanges(DatasetChanges{
Created: []string{
repo.DatasetRef{Peername: fmt.Sprintf("peer-%d", i), PeerID: p.ID.Pretty(), Name: fmt.Sprintf("dataset-%d", i), Path: fmt.Sprintf("/ipfs/QmeLid2tvSZvuUDStCbP3zvzDk1977JmUcFxD9tYjwwmY%d", i)}.String(),
},
}); err != nil {
t.Errorf("%s error: %s", p.ID.Pretty(), err.Error())
}
}(i, p)
}

wg.Wait()
}

func TestSelfReplication(t *testing.T) {

ctx := context.Background()
peers, err := NewTestDirNetwork(ctx, t)
if err != nil {
t.Error(err.Error())
return
}

box1 := peers[0]

pro, err := box1.Repo.Profile()
if err != nil {
t.Error(err.Error())
return
}

pid, err := profile.IDB58Decode(pro.ID)
if err != nil {
t.Error(err.Error())
return
}

box2Repo, err := NewTestRepo(pid)
if err != nil {
t.Error(err.Error())
return
}

box2, err := newTestQriNode(box2Repo, t)
if err != nil {
t.Error(err.Error())
return
}

// pro, _ := box2.Repo.Profile()
// log.Debugf("box2 ids: %s %s", box2.ID.Pretty(), pro.ID)

peers = append(peers, box2)
if err := connectNodes(ctx, peers); err != nil {
t.Error(err.Error())
return
}

rcpre, err := box2.Repo.RefCount()
if err != nil {
t.Error(err.Error())
return
}

t.Logf("testing %d peers", len(peers))
var wg sync.WaitGroup

for i, p := range peers {
wg.Add(1)

r := make(chan Message)
p.ReceiveMessages(r)

go func(p *QriNode) {
for {
msg := <-r

if msg.Type != MtDatasetChanges {
t.Error("expected only dataset_changes messages")
}

count, err := p.Repo.RefCache().RefCount()
if err != nil {
t.Errorf("%s, error getting RefCount: %s", p.ID, err.Error())
}

// log.Debugf("%s has %d refs", p.ID, count)

if count == len(peers)-2 {
// log.Debugf("%s is done", p.ID)
wg.Done()
return
}
}
}(p)

go func(i int, p *QriNode) {
refs, err := p.Repo.References(1, 0)
if err != nil {
t.Errorf("%s error: %s", p.ID.Pretty(), err.Error())
}

if len(refs) > 0 {
if err := p.AnnounceDatasetChanges(DatasetChanges{
Created: []string{
refs[0].String(),
},
}); err != nil {
t.Errorf("%s error: %s", p.ID.Pretty(), err.Error())
}
}

}(i, p)
}

wg.Wait()

rcpost, err := box2.Repo.RefCount()
if err != nil {
t.Error(err.Error())
return
}

if rcpre+1 != rcpost {
t.Errorf("expected box2 refcount to increment by 1. expected: %d, got: %d", rcpre+1, rcpost)
}

}
22 changes: 0 additions & 22 deletions p2p/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,3 @@ func (n *QriNode) handleDatasetsList(ws *WrappedStream, msg Message) (hangup boo

return
}

// MtDatasetsCreated announces the creation of one or more datasets
// const MtDatasetsCreated = MsgType("datasets_created")

// func (n *QriNode) AnnounceDatasetsCreated(ds ...repo.DatasetRef) error {

// }

// func (n QriNode) handleDatasestsCreated(ws *WrappedStream, msg Message) error {

// }

// // MtDatasetInfo gets info on a dataset
// const MtDatasetsDeleted = MsgType("datasets_deleted")

// func (n *QriNode) AnnounceDatasetsCreated(ds ...repo.DatasetRef) error {

// }

// func (n QriNode) handleDatasestsCreated(ws *WrappedStream, msg Message) error {

// }
2 changes: 2 additions & 0 deletions p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func NewMessage(initiator peer.ID, t MsgType, body []byte) Message {
return Message{
ID: NewMessageID(),
Initiator: initiator,
Created: time.Now(),
Deadline: time.Now().Add(time.Minute * 2),
Type: t,
Headers: map[string]string{},
Body: body,
Expand Down
Loading

0 comments on commit 29016e6

Please sign in to comment.