Skip to content

Commit

Permalink
refactor(dsync): incorporate dsync updates
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Aug 21, 2019
1 parent d0dc9d9 commit e6341fc
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,4 @@ build-cross-platform:
@echo "building qri_darwin_amd64"
mkdir qri_darwin_amd64
env GOOS=darwin GOARCH=amd64 go build -o qri_darwin_amd64/qri .
zip -r qri_darwin_amd64.zip qri_darwin_amd64 && rm -r qri_darwin_amd64
zip -r qri_darwin_amd64.zip qri_darwin_amd64 && rm -r qri_darwin_amd64
14 changes: 14 additions & 0 deletions lib/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

golog "github.com/ipfs/go-log"
homedir "github.com/mitchellh/go-homedir"
"github.com/qri-io/dag"
"github.com/qri-io/dag/dsync"
"github.com/qri-io/ioes"
"github.com/qri-io/qfs"
"github.com/qri-io/qfs/cafs"
Expand Down Expand Up @@ -70,6 +72,11 @@ func Receivers(inst *Instance) []Methods {
NewConfigMethods(inst),
NewSearchRequests(node, nil),
NewRenderRequests(r, nil),
<<<<<<< HEAD
=======
NewSelectionRequests(r, nil),
NewRemoteMethods(inst),
>>>>>>> refactor(dsync): incorporate dsync updates
NewUpdateMethods(inst),
NewFSIMethods(inst),
}
Expand Down Expand Up @@ -266,6 +273,12 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
}
inst.node.LocalStreams = o.Streams

capi, err := inst.node.IPFSCoreAPI()
if err != nil {
return err
}
inst.dsync = dsync.New(dag.NewNodeGetter(capi), capi.Block())

return
}

Expand Down Expand Up @@ -439,6 +452,7 @@ type Instance struct {
repo repo.Repo
node *p2p.QriNode
cron cron.Scheduler
dsync *dsync.Dsync

rpc *rpc.Client
}
Expand Down
5 changes: 3 additions & 2 deletions lib/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ func (lp ListParams) Page() util.Page {

// PushParams holds parameters for pushing daginfo to remotes
type PushParams struct {
Ref string
RemoteName string
Ref string
RemoteName string
PinOnComplete bool
}

// ReceiveParams hold parameters for receiving daginfo's when running as a remote
Expand Down
66 changes: 30 additions & 36 deletions lib/remote.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,35 @@
package lib

import (
"context"
"fmt"
"net/rpc"
"sync"
"time"

"github.com/qri-io/dag/dsync"
"github.com/qri-io/qri/actions"
"github.com/qri-io/qri/base"
"github.com/qri-io/qri/config"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/repo"
)

const allowedDagInfoSize uint64 = 10 * 1024 * 1024

// RemoteRequests encapsulates business logic of remote operation
// RemoteMethods encapsulates business logic of remote operation
// TODO (b5): switch to using an Instance instead of separate fields
type RemoteRequests struct {
cfg *config.Config
cli *rpc.Client
node *p2p.QriNode
Receivers *dsync.Receivers
Sessions map[string]*ReceiveParams
lock sync.Mutex
type RemoteMethods struct {
inst *Instance
}

// NewRemoteRequests creates a RemoteRequests pointer from either a node or an rpc.Client
func NewRemoteRequests(node *p2p.QriNode, cfg *config.Config, cli *rpc.Client) *RemoteRequests {
if node != nil && cli != nil {
panic(fmt.Errorf("both repo and client supplied to NewRemoteRequests"))
}
return &RemoteRequests{
cfg: cfg,
cli: cli,
node: node,
Sessions: make(map[string]*ReceiveParams),
// NewRemoteMethods creates a RemoteMethods pointer from either a node or an rpc.Client
func NewRemoteMethods(inst *Instance) *RemoteMethods {
return &RemoteMethods{
inst: inst,
}
}

// CoreRequestsName implements the Requests interface
func (*RemoteRequests) CoreRequestsName() string { return "remote" }
func (*RemoteMethods) CoreRequestsName() string { return "remote" }

// PushToRemote posts a dagInfo to a remote
func (r *RemoteRequests) PushToRemote(p *PushParams, out *bool) error {
func (r *RemoteMethods) PushToRemote(p *PushParams, out *bool) error {
if r.cli != nil {
return r.cli.Call("DatasetRequests.PushToRemote", p, out)
}
Expand All @@ -67,27 +52,36 @@ func (r *RemoteRequests) PushToRemote(p *PushParams, out *bool) error {
return fmt.Errorf("remote name \"%s\" not found", p.RemoteName)
}

sessionID, dagDiff, err := actions.DsyncStartPush(r.node, dagInfo, location, &ref)
push, err := r.inst.dsync.NewPushManifest(dagInfo.Manifest, location, PushParams.Pin)
if err != nil {
return err
return nil
}

err = actions.DsyncSendBlocks(r.node, location, sessionID, dagInfo.Manifest, dagDiff)
if err != nil {
return err
if err := push.Do(context.Background()); err != nil {
return nil
}

err = actions.DsyncCompletePush(r.node, location, sessionID)
if err != nil {
return err
}
// sessionID, dagDiff, err := actions.DsyncStartPush(r.node, dagInfo, location, &ref)
// if err != nil {
// return err
// }

// err = actions.DsyncSendBlocks(r.node, location, sessionID, dagInfo.Manifest, dagDiff)
// if err != nil {
// return err
// }

// err = actions.DsyncCompletePush(r.node, location, sessionID)
// if err != nil {
// return err
// }

*out = true
return nil
}

// Receive is used to save a dataset when running as a remote. API only, not RPC or command-line.
func (r *RemoteRequests) Receive(p *ReceiveParams, res *ReceiveResult) (err error) {
func (r *RemoteMethods) Receive(p *ReceiveParams, res *ReceiveResult) (err error) {
if r.cli != nil {
return fmt.Errorf("receive cannot be called over RPC")
}
Expand Down Expand Up @@ -158,7 +152,7 @@ func (r *RemoteRequests) Receive(p *ReceiveParams, res *ReceiveResult) (err erro
}

// Complete is used to complete a dataset that has been pushed to this remote
func (r *RemoteRequests) Complete(p *CompleteParams, res *bool) (err error) {
func (r *RemoteMethods) Complete(p *CompleteParams, res *bool) (err error) {
sid := p.SessionID
session, ok := r.Sessions[sid]
if !ok {
Expand Down
4 changes: 3 additions & 1 deletion lib/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ func TestRemote(t *testing.T) {
if err != nil {
t.Fatal(err.Error())
}
req := NewRemoteRequests(node, cfg, nil)

inst := &Instance{ node: node, cfg: cfg }
req := NewRemoteMethods(inst)
req.Receivers = dsync.NewTestReceivers()

exampleDagInfo := &dag.Info{
Expand Down

0 comments on commit e6341fc

Please sign in to comment.