diff --git a/actions/dataset.go b/actions/dataset.go index cd4154b50..11575710d 100644 --- a/actions/dataset.go +++ b/actions/dataset.go @@ -3,7 +3,6 @@ package actions import ( "fmt" "os" - "strings" "github.com/ipfs/go-datastore" "github.com/qri-io/cafs" @@ -93,7 +92,22 @@ func UpdateDataset(node *p2p.QriNode, ref *repo.DatasetRef, dryRun, pin bool) (r } if !base.InLocalNamespace(node.Repo, ref) { - err = fmt.Errorf("remote updates are not yet finished") + var ldr base.LogDiffResult + ldr, err = node.RequestLogDiff(ref) + if err != nil { + return + } + for _, add := range ldr.Add { + if err = base.FetchDataset(node.Repo, &add, true, false); err != nil { + return + } + } + // for _, remove := range ldr.Remove { + // if err = base.UnpinDataset(node.Repo, remove); err != nil { + // return + // } + // } + err = node.Repo.PutRef(ldr.Head) return } @@ -169,42 +183,16 @@ func AddDataset(node *p2p.QriNode, ref *repo.DatasetRef) (err error) { } } - r := node.Repo - key := datastore.NewKey(strings.TrimSuffix(ref.Path, "/"+dsfs.PackageFileDataset.String())) - path := datastore.NewKey(key.String() + "/" + dsfs.PackageFileDataset.String()) - - fetcher, ok := r.Store().(cafs.Fetcher) - if !ok { - err = fmt.Errorf("this store cannot fetch from remote sources") + if err = base.FetchDataset(node.Repo, ref, true, true); err != nil { return } - // TODO: This is asserting that the target is Fetch-able, but inside dsfs.LoadDataset, - // only Get is called. Clean up the semantics of Fetch and Get to get this expection - // more correctly in line with what's actually required. - _, err = fetcher.Fetch(cafs.SourceAny, key) - if err != nil { - return fmt.Errorf("error fetching file: %s", err.Error()) - } - - if err = base.PinDataset(r, *ref); err != nil { - log.Debug(err.Error()) - return fmt.Errorf("error pinning root key: %s", err.Error()) - } - - if err = r.PutRef(*ref); err != nil { + if err = node.Repo.PutRef(*ref); err != nil { log.Debug(err.Error()) return fmt.Errorf("error putting dataset name in repo: %s", err.Error()) } - ds, err := dsfs.LoadDataset(r.Store(), path) - if err != nil { - log.Debug(err.Error()) - return fmt.Errorf("error loading newly saved dataset path: %s", path.String()) - } - - ref.Dataset = ds.Encode() - return + return nil } // SetPublishStatus configures the publish status of a stored reference diff --git a/base/dataset.go b/base/dataset.go index a99ec5c80..516e4e2e9 100644 --- a/base/dataset.go +++ b/base/dataset.go @@ -146,6 +146,45 @@ func CreateDataset(r repo.Repo, streams ioes.IOStreams, name string, ds *dataset return } +// FetchDataset grabs a dataset from a remote source +func FetchDataset(r repo.Repo, ref *repo.DatasetRef, pin, load bool) (err error) { + key := datastore.NewKey(strings.TrimSuffix(ref.Path, "/"+dsfs.PackageFileDataset.String())) + path := datastore.NewKey(key.String() + "/" + dsfs.PackageFileDataset.String()) + + fetcher, ok := r.Store().(cafs.Fetcher) + if !ok { + err = fmt.Errorf("this store cannot fetch from remote sources") + return + } + + // TODO: This is asserting that the target is Fetch-able, but inside dsfs.LoadDataset, + // only Get is called. Clean up the semantics of Fetch and Get to get this expection + // more correctly in line with what's actually required. + _, err = fetcher.Fetch(cafs.SourceAny, key) + if err != nil { + return fmt.Errorf("error fetching file: %s", err.Error()) + } + + if pin { + if err = PinDataset(r, *ref); err != nil { + log.Debug(err.Error()) + return fmt.Errorf("error pinning root key: %s", err.Error()) + } + } + + if load { + ds, err := dsfs.LoadDataset(r.Store(), path) + if err != nil { + log.Debug(err.Error()) + return fmt.Errorf("error loading newly saved dataset path: %s", path.String()) + } + + ref.Dataset = ds.Encode() + } + + return +} + // ReadDataset grabs a dataset from the store func ReadDataset(r repo.Repo, ref *repo.DatasetRef) (err error) { if store := r.Store(); store != nil { diff --git a/base/dataset_test.go b/base/dataset_test.go index a4261972b..8091c3479 100644 --- a/base/dataset_test.go +++ b/base/dataset_test.go @@ -7,11 +7,10 @@ import ( "net/http/httptest" "testing" - "github.com/qri-io/ioes" - "github.com/qri-io/cafs" "github.com/qri-io/dataset" "github.com/qri-io/dataset/dstest" + "github.com/qri-io/ioes" "github.com/qri-io/qri/repo" "github.com/qri-io/qri/repo/profile" ) @@ -99,6 +98,23 @@ func TestCreateDataset(t *testing.T) { } } +func TestFetchDataset(t *testing.T) { + r1 := newTestRepo(t) + r2 := newTestRepo(t) + ref := addCitiesDataset(t, r2) + + // Connect in memory Mapstore's behind the scene to simulate IPFS-like behavior. + r1.Store().(*cafs.MapStore).AddConnection(r2.Store().(*cafs.MapStore)) + + if err := FetchDataset(r1, &repo.DatasetRef{Peername: "foo", Name: "bar"}, true, true); err == nil { + t.Error("expected add of invalid ref to error") + } + + if err := FetchDataset(r1, &ref, true, true); err != nil { + t.Error(err.Error()) + } +} + func TestDatasetPodBodyFile(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Write([]byte(`{"json":"data"}`)) diff --git a/p2p/log_diff.go b/p2p/log_diff.go new file mode 100644 index 000000000..8b4edd52e --- /dev/null +++ b/p2p/log_diff.go @@ -0,0 +1,92 @@ +package p2p + +import ( + "encoding/json" + "fmt" + + "github.com/qri-io/qri/base" + "github.com/qri-io/qri/repo" +) + +// MtLogDiff gets info on a dataset +const MtLogDiff = MsgType("log_diff") + +// RequestLogDiff fetches info about a dataset from qri peers +// It's expected the local peer has attempted to canonicalize the reference +// before sending to the network +func (n *QriNode) RequestLogDiff(ref *repo.DatasetRef) (ldr base.LogDiffResult, err error) { + log.Debugf("%s RequestLogDiff %s", n.ID, ref) + + p, err := n.ConnectToPeer(n.ctx, PeerConnectionParams{ + Peername: ref.Peername, + ProfileID: ref.ProfileID, + }) + + if err != nil { + err = fmt.Errorf("coudn't connection to peer: %s", err.Error()) + return + } + + // TODO - deal with max limit / offset / pagination issuez + rLog, err := base.DatasetLog(n.Repo, *ref, 10000, 0, false) + if err != nil { + return + } + + replies := make(chan Message) + req, err := NewJSONBodyMessage(n.ID, MtLogDiff, rLog) + req = req.WithHeaders("phase", "request") + if err != nil { + log.Debug(err.Error()) + return + } + + for _, pid := range p.PeerIDs { + if err = n.SendMessage(req, replies, pid); err != nil { + log.Debugf("%s err: %s", pid, err.Error()) + continue + } + + res := <-replies + ldr = base.LogDiffResult{} + if err = json.Unmarshal(res.Body, &ldr); err == nil { + break + } + } + + return +} + +func (n *QriNode) handleLogDiff(ws *WrappedStream, msg Message) (hangup bool) { + hangup = true + + switch msg.Header("phase") { + case "request": + remoteLog := []repo.DatasetRef{} + if err := json.Unmarshal(msg.Body, &remoteLog); err != nil { + log.Debug(err.Error()) + return + } + + res := msg + ldr, err := base.LogDiff(n.Repo, remoteLog) + if err != nil { + log.Error(err) + return + } + + res, err = msg.UpdateJSON(ldr) + if err != nil { + log.Error(err) + return + } + + res = res.WithHeaders("phase", "response") + if err := ws.sendMessage(res); err != nil { + log.Debug(err.Error()) + return + } + } + + return +} diff --git a/p2p/log_diff_test.go b/p2p/log_diff_test.go new file mode 100644 index 000000000..43b6dfcce --- /dev/null +++ b/p2p/log_diff_test.go @@ -0,0 +1,73 @@ +package p2p + +import ( + "testing" +) + +func TestRequestLogDiff(t *testing.T) { + // ctx := context.Background() + // streams := ioes.NewDiscardIOStreams() + // factory := p2ptest.NewTestNodeFactory(NewTestableQriNode) + // testPeers, err := p2ptest.NewTestDirNetwork(ctx, factory) + // if err != nil { + // t.Fatalf("error creating network: %s", err.Error()) + // } + // if err := p2ptest.ConnectQriNodes(ctx, testPeers); err != nil { + // t.Fatalf("error connecting peers: %s", err.Error()) + // } + + // peers := asQriNodes(testPeers) + + // tc, err := dstest.NewTestCaseFromDir("testdata/tim/craigslist") + // if err != nil { + // t.Fatal(err) + // } + + // // add a dataset to tim + // ref, _, err := base.CreateDataset(peers[4].Repo, streams, tc.Name, tc.Input, tc.BodyFile(), false, true) + // if err != nil { + // t.Fatal(err) + // } + + // // + // // peers[] + + // prevTitle := tc.Input.Meta.Title + // tc.Input.Meta.Title = "update" + // tc.Input.PreviousPath = ref.Path + // defer func() { + // // because test cases are cached for performance, we need to clean up any mutation to + // // testcase input + // tc.Input.Meta.Title = prevTitle + // tc.Input.PreviousPath = ref.Path + // }() + + // ref2, _, err := base.CreateDataset(peers[4].Repo, streams, tc.Name, tc.Input, tc.BodyFile(), false, true) + // if err != nil { + // t.Fatal(err) + // } + + // t.Logf("testing RequestDatasetLog message with %d peers", len(peers)) + // // var wg sync.WaitGroup + // for i, p1 := range peers { + // for _, p2 := range peers[i+1:] { + // // TODO - having these in parallel is causing races when encoding logs + // // wg.Add(1) + // // go func(p1, p2 *QriNode) { + // // defer wg.Done() + + // refs, err := p1.RequestDatasetLog(ref, 100, 0) + // if err != nil { + // t.Errorf("%s -> %s error: %s", p1.ID.Pretty(), p2.ID.Pretty(), err.Error()) + // } + // if refs == nil { + // t.Error("profile shouldn't be nil") + // return + // } + // // t.Log(refs) + // // }(p1, p2) + // } + // } + + // wg.Wait() +} diff --git a/p2p/node.go b/p2p/node.go index 9a4ebaaee..38f1eeedf 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -416,5 +416,6 @@ func MakeHandlers(n *QriNode) map[MsgType]HandlerFunc { MtResolveDatasetRef: n.handleResolveDatasetRef, MtDatasetLog: n.handleDatasetLog, MtQriPeers: n.handleQriPeers, + MtLogDiff: n.handleLogDiff, } }