Skip to content

Commit

Permalink
feat(p2p update): initial p2p update
Browse files Browse the repository at this point in the history
this comes with a few goodies. I've moved the logic from Add that leverages the cafs.Store.Fetcher interface into a base package function called FetchDataset. This is now used in both add and remote Updates. I've also bumped tests in a few spots, with more tests to write.
  • Loading branch information
b5 committed Nov 2, 2018
1 parent bd7340a commit 76e886a
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 33 deletions.
50 changes: 19 additions & 31 deletions actions/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package actions
import (
"fmt"
"os"
"strings"

"github.com/ipfs/go-datastore"
"github.com/qri-io/cafs"
Expand Down Expand Up @@ -93,7 +92,22 @@ func UpdateDataset(node *p2p.QriNode, ref *repo.DatasetRef, dryRun, pin bool) (r
}

if !base.InLocalNamespace(node.Repo, ref) {
err = fmt.Errorf("remote updates are not yet finished")
var ldr base.LogDiffResult
ldr, err = node.RequestLogDiff(ref)
if err != nil {
return
}
for _, add := range ldr.Add {
if err = base.FetchDataset(node.Repo, &add, true, false); err != nil {
return
}
}
// for _, remove := range ldr.Remove {
// if err = base.UnpinDataset(node.Repo, remove); err != nil {
// return
// }
// }
err = node.Repo.PutRef(ldr.Head)
return
}

Expand Down Expand Up @@ -169,42 +183,16 @@ func AddDataset(node *p2p.QriNode, ref *repo.DatasetRef) (err error) {
}
}

r := node.Repo
key := datastore.NewKey(strings.TrimSuffix(ref.Path, "/"+dsfs.PackageFileDataset.String()))
path := datastore.NewKey(key.String() + "/" + dsfs.PackageFileDataset.String())

fetcher, ok := r.Store().(cafs.Fetcher)
if !ok {
err = fmt.Errorf("this store cannot fetch from remote sources")
if err = base.FetchDataset(node.Repo, ref, true, true); err != nil {
return
}

// TODO: This is asserting that the target is Fetch-able, but inside dsfs.LoadDataset,
// only Get is called. Clean up the semantics of Fetch and Get to get this expection
// more correctly in line with what's actually required.
_, err = fetcher.Fetch(cafs.SourceAny, key)
if err != nil {
return fmt.Errorf("error fetching file: %s", err.Error())
}

if err = base.PinDataset(r, *ref); err != nil {
log.Debug(err.Error())
return fmt.Errorf("error pinning root key: %s", err.Error())
}

if err = r.PutRef(*ref); err != nil {
if err = node.Repo.PutRef(*ref); err != nil {
log.Debug(err.Error())
return fmt.Errorf("error putting dataset name in repo: %s", err.Error())
}

ds, err := dsfs.LoadDataset(r.Store(), path)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("error loading newly saved dataset path: %s", path.String())
}

ref.Dataset = ds.Encode()
return
return nil
}

// SetPublishStatus configures the publish status of a stored reference
Expand Down
39 changes: 39 additions & 0 deletions base/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,45 @@ func CreateDataset(r repo.Repo, streams ioes.IOStreams, name string, ds *dataset
return
}

// FetchDataset grabs a dataset from a remote source
func FetchDataset(r repo.Repo, ref *repo.DatasetRef, pin, load bool) (err error) {
key := datastore.NewKey(strings.TrimSuffix(ref.Path, "/"+dsfs.PackageFileDataset.String()))
path := datastore.NewKey(key.String() + "/" + dsfs.PackageFileDataset.String())

fetcher, ok := r.Store().(cafs.Fetcher)
if !ok {
err = fmt.Errorf("this store cannot fetch from remote sources")
return
}

// TODO: This is asserting that the target is Fetch-able, but inside dsfs.LoadDataset,
// only Get is called. Clean up the semantics of Fetch and Get to get this expection
// more correctly in line with what's actually required.
_, err = fetcher.Fetch(cafs.SourceAny, key)
if err != nil {
return fmt.Errorf("error fetching file: %s", err.Error())
}

if pin {
if err = PinDataset(r, *ref); err != nil {
log.Debug(err.Error())
return fmt.Errorf("error pinning root key: %s", err.Error())
}
}

if load {
ds, err := dsfs.LoadDataset(r.Store(), path)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("error loading newly saved dataset path: %s", path.String())
}

ref.Dataset = ds.Encode()
}

return
}

// ReadDataset grabs a dataset from the store
func ReadDataset(r repo.Repo, ref *repo.DatasetRef) (err error) {
if store := r.Store(); store != nil {
Expand Down
20 changes: 18 additions & 2 deletions base/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import (
"net/http/httptest"
"testing"

"github.com/qri-io/ioes"

"github.com/qri-io/cafs"
"github.com/qri-io/dataset"
"github.com/qri-io/dataset/dstest"
"github.com/qri-io/ioes"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/profile"
)
Expand Down Expand Up @@ -99,6 +98,23 @@ func TestCreateDataset(t *testing.T) {
}
}

func TestFetchDataset(t *testing.T) {
r1 := newTestRepo(t)
r2 := newTestRepo(t)
ref := addCitiesDataset(t, r2)

// Connect in memory Mapstore's behind the scene to simulate IPFS-like behavior.
r1.Store().(*cafs.MapStore).AddConnection(r2.Store().(*cafs.MapStore))

if err := FetchDataset(r1, &repo.DatasetRef{Peername: "foo", Name: "bar"}, true, true); err == nil {
t.Error("expected add of invalid ref to error")
}

if err := FetchDataset(r1, &ref, true, true); err != nil {
t.Error(err.Error())
}
}

func TestDatasetPodBodyFile(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"json":"data"}`))
Expand Down
92 changes: 92 additions & 0 deletions p2p/log_diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package p2p

import (
"encoding/json"
"fmt"

"github.com/qri-io/qri/base"
"github.com/qri-io/qri/repo"
)

// MtLogDiff gets info on a dataset
const MtLogDiff = MsgType("log_diff")

// RequestLogDiff fetches info about a dataset from qri peers
// It's expected the local peer has attempted to canonicalize the reference
// before sending to the network
func (n *QriNode) RequestLogDiff(ref *repo.DatasetRef) (ldr base.LogDiffResult, err error) {
log.Debugf("%s RequestLogDiff %s", n.ID, ref)

p, err := n.ConnectToPeer(n.ctx, PeerConnectionParams{
Peername: ref.Peername,
ProfileID: ref.ProfileID,
})

if err != nil {
err = fmt.Errorf("coudn't connection to peer: %s", err.Error())
return
}

// TODO - deal with max limit / offset / pagination issuez
rLog, err := base.DatasetLog(n.Repo, *ref, 10000, 0, false)
if err != nil {
return
}

replies := make(chan Message)
req, err := NewJSONBodyMessage(n.ID, MtLogDiff, rLog)
req = req.WithHeaders("phase", "request")
if err != nil {
log.Debug(err.Error())
return
}

for _, pid := range p.PeerIDs {
if err = n.SendMessage(req, replies, pid); err != nil {
log.Debugf("%s err: %s", pid, err.Error())
continue
}

res := <-replies
ldr = base.LogDiffResult{}
if err = json.Unmarshal(res.Body, &ldr); err == nil {
break
}
}

return
}

func (n *QriNode) handleLogDiff(ws *WrappedStream, msg Message) (hangup bool) {
hangup = true

switch msg.Header("phase") {
case "request":
remoteLog := []repo.DatasetRef{}
if err := json.Unmarshal(msg.Body, &remoteLog); err != nil {
log.Debug(err.Error())
return
}

res := msg
ldr, err := base.LogDiff(n.Repo, remoteLog)
if err != nil {
log.Error(err)
return
}

res, err = msg.UpdateJSON(ldr)
if err != nil {
log.Error(err)
return
}

res = res.WithHeaders("phase", "response")
if err := ws.sendMessage(res); err != nil {
log.Debug(err.Error())
return
}
}

return
}
73 changes: 73 additions & 0 deletions p2p/log_diff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package p2p

import (
"testing"
)

func TestRequestLogDiff(t *testing.T) {
// ctx := context.Background()
// streams := ioes.NewDiscardIOStreams()
// factory := p2ptest.NewTestNodeFactory(NewTestableQriNode)
// testPeers, err := p2ptest.NewTestDirNetwork(ctx, factory)
// if err != nil {
// t.Fatalf("error creating network: %s", err.Error())
// }
// if err := p2ptest.ConnectQriNodes(ctx, testPeers); err != nil {
// t.Fatalf("error connecting peers: %s", err.Error())
// }

// peers := asQriNodes(testPeers)

// tc, err := dstest.NewTestCaseFromDir("testdata/tim/craigslist")
// if err != nil {
// t.Fatal(err)
// }

// // add a dataset to tim
// ref, _, err := base.CreateDataset(peers[4].Repo, streams, tc.Name, tc.Input, tc.BodyFile(), false, true)
// if err != nil {
// t.Fatal(err)
// }

// //
// // peers[]

// prevTitle := tc.Input.Meta.Title
// tc.Input.Meta.Title = "update"
// tc.Input.PreviousPath = ref.Path
// defer func() {
// // because test cases are cached for performance, we need to clean up any mutation to
// // testcase input
// tc.Input.Meta.Title = prevTitle
// tc.Input.PreviousPath = ref.Path
// }()

// ref2, _, err := base.CreateDataset(peers[4].Repo, streams, tc.Name, tc.Input, tc.BodyFile(), false, true)
// if err != nil {
// t.Fatal(err)
// }

// t.Logf("testing RequestDatasetLog message with %d peers", len(peers))
// // var wg sync.WaitGroup
// for i, p1 := range peers {
// for _, p2 := range peers[i+1:] {
// // TODO - having these in parallel is causing races when encoding logs
// // wg.Add(1)
// // go func(p1, p2 *QriNode) {
// // defer wg.Done()

// refs, err := p1.RequestDatasetLog(ref, 100, 0)
// if err != nil {
// t.Errorf("%s -> %s error: %s", p1.ID.Pretty(), p2.ID.Pretty(), err.Error())
// }
// if refs == nil {
// t.Error("profile shouldn't be nil")
// return
// }
// // t.Log(refs)
// // }(p1, p2)
// }
// }

// wg.Wait()
}
1 change: 1 addition & 0 deletions p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,5 +416,6 @@ func MakeHandlers(n *QriNode) map[MsgType]HandlerFunc {
MtResolveDatasetRef: n.handleResolveDatasetRef,
MtDatasetLog: n.handleDatasetLog,
MtQriPeers: n.handleQriPeers,
MtLogDiff: n.handleLogDiff,
}
}

0 comments on commit 76e886a

Please sign in to comment.