Skip to content

Commit

Permalink
feat(bsync): initial block-sync sketched out
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Dec 5, 2018
1 parent 285d409 commit 6718e69
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 58 deletions.
48 changes: 28 additions & 20 deletions bsync/bsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format"
coreiface "gx/ipfs/QmUJYo4etAQqFfSS2rarFAE97eNGB8ej64YkRT2SmsYD4r/go-ipfs/core/coreapi/interface"
coreopt "gx/ipfs/QmUJYo4etAQqFfSS2rarFAE97eNGB8ej64YkRT2SmsYD4r/go-ipfs/core/coreapi/interface/options"
)

// Completion tracks progress of a sync task against a manifest.
Expand Down Expand Up @@ -119,13 +118,21 @@ type Send struct {
responses chan Response
}

const defaultSendParallelism = 4

// NewSend gets a local path to a remote place using a local NodeGetter and a remote
func NewSend(ctx context.Context, lng ipld.NodeGetter, mfst *manifest.Manifest, remote Remote) (*Send, error) {
parallelism := defaultSendParallelism
if len(mfst.Nodes) < parallelism {
parallelism = len(mfst.Nodes)
}

ps := &Send{
ctx: ctx,
mfst: mfst,
lng: lng,
remote: remote,
parallelism: 4,
parallelism: parallelism,
blocksCh: make(chan string, 8),
progCh: make(chan Completion, 8),
responses: make(chan Response),
Expand All @@ -141,11 +148,11 @@ func (snd *Send) Do() (err error) {
}

snd.prog = NewCompletion(snd.mfst, snd.diff)
go snd.updateCompletion()
go snd.completionChanged()

// create senders
sends := make([]sender, snd.parallelism)
for i := 0; i <= snd.parallelism; i++ {
for i := 0; i < snd.parallelism; i++ {
sends[i] = sender{
sid: snd.sid,
ctx: snd.ctx,
Expand All @@ -172,7 +179,7 @@ func (snd *Send) Do() (err error) {
snd.prog[i] = 100
}
}
go snd.updateCompletion()
go snd.completionChanged()
if snd.prog.Complete() {
errCh <- nil
return
Expand Down Expand Up @@ -205,7 +212,7 @@ func (snd *Send) Completion() <-chan Completion {
return snd.progCh
}

func (snd *Send) updateCompletion() {
func (snd *Send) completionChanged() {
snd.progCh <- snd.prog
}

Expand Down Expand Up @@ -262,16 +269,15 @@ type Receive struct {
sid string
ctx context.Context
lng ipld.NodeGetter
dag coreiface.DagAPI
batch coreiface.DagBatch
bapi coreiface.BlockAPI
mfst *manifest.Manifest
diff *manifest.Manifest
prog Completion
progCh chan Completion
}

// NewReceive creates a receive state machine
func NewReceive(ctx context.Context, lng ipld.NodeGetter, dag coreiface.DagAPI, mfst *manifest.Manifest) (*Receive, error) {
func NewReceive(ctx context.Context, lng ipld.NodeGetter, bapi coreiface.BlockAPI, mfst *manifest.Manifest) (*Receive, error) {
diff, err := base.Missing(ctx, lng, mfst)
if err != nil {
return nil, err
Expand All @@ -281,26 +287,21 @@ func NewReceive(ctx context.Context, lng ipld.NodeGetter, dag coreiface.DagAPI,
sid: randStringBytesMask(10),
ctx: ctx,
lng: lng,
dag: dag,
batch: dag.Batch(ctx),
bapi: bapi,
mfst: mfst,
diff: diff,
prog: NewCompletion(mfst, diff),
progCh: make(chan Completion),
}

go r.updateCompletion()
go r.completionChanged()

return r, nil
}

// ReceiveBlock accepts a block from the sender, placing it in the local blockstore
func (r *Receive) ReceiveBlock(hash string, data io.Reader) Response {
path, err := r.batch.Put(r.ctx, data, coreopt.DagPutOption(func(o *coreopt.DagPutSettings) error {
o.InputEnc = "raw"
o.Codec = cid.DagProtobuf
return nil
}))
bstat, err := r.bapi.Put(r.ctx, data)

if err != nil {
return Response{
Expand All @@ -310,23 +311,30 @@ func (r *Receive) ReceiveBlock(hash string, data io.Reader) Response {
}
}

if path.Cid().String() != hash {
id := bstat.Path().Cid()
if id.String() != hash {
return Response{
Hash: hash,
Status: StatusErrored,
Err: fmt.Errorf("hash mismatch. expected: '%s', got: '%s'", hash, path.Cid().String()),
Err: fmt.Errorf("hash mismatch. expected: '%s', got: '%s'", hash, id.String()),
}
}

// this should be the only place that modifies progress
for i, h := range r.mfst.Nodes {
if hash == h {
r.prog[i] = 100
}
}
go r.completionChanged()

return Response{
Hash: hash,
Status: StatusOk,
}
}

func (r *Receive) updateCompletion() {
func (r *Receive) completionChanged() {
r.progCh <- r.prog
}

Expand Down
147 changes: 114 additions & 33 deletions bsync/bsync_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package bsync

// import (
// "context"
// "fmt"
// "strconv"

// "github.com/multiformats/go-multihash"

// "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
// ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format"
// )
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"strings"
"testing"

"github.com/qri-io/qri/manifest"

"gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format"
coreiface "gx/ipfs/QmUJYo4etAQqFfSS2rarFAE97eNGB8ej64YkRT2SmsYD4r/go-ipfs/core/coreapi/interface"
files "gx/ipfs/QmZMWMvWMVKCbHetJ4RgndbuEF1io2UpUxwQwtNjtYPzSC/go-ipfs-files"
// coreopt "gx/ipfs/QmUJYo4etAQqFfSS2rarFAE97eNGB8ej64YkRT2SmsYD4r/go-ipfs/core/coreapi/interface/options"
)

// type node struct {
// cid *cid.Cid
Expand Down Expand Up @@ -82,26 +88,101 @@ package bsync
// }
// }

// type TestNodeGetter struct {
// Nodes []ipld.Node
// }

// var _ ipld.NodeGetter = (*TestNodeGetter)(nil)

// func (ng TestNodeGetter) Get(_ context.Context, id cid.Cid) (ipld.Node, error) {
// for _, node := range ng.Nodes {
// if id.Equals(node.Cid()) {
// return node, nil
// }
// }
// return nil, fmt.Errorf("cid not found: %s", id.String())
// }

// // GetMany returns a channel of NodeOptions given a set of CIDs.
// func (ng TestNodeGetter) GetMany(context.Context, []cid.Cid) <-chan *ipld.NodeOption {
// ch := make(chan *ipld.NodeOption)
// ch <- &ipld.NodeOption{
// Err: fmt.Errorf("doesn't support GetMany"),
// }
// return ch
// }
type TestNodeGetter struct {
Nodes []ipld.Node
}

var _ ipld.NodeGetter = (*TestNodeGetter)(nil)

func (ng TestNodeGetter) Get(_ context.Context, id cid.Cid) (ipld.Node, error) {
for _, node := range ng.Nodes {
if id.Equals(node.Cid()) {
return node, nil
}
}
return nil, fmt.Errorf("cid not found: %s", id.String())
}

// GetMany returns a channel of NodeOptions given a set of CIDs.
func (ng TestNodeGetter) GetMany(context.Context, []cid.Cid) <-chan *ipld.NodeOption {
ch := make(chan *ipld.NodeOption)
ch <- &ipld.NodeOption{
Err: fmt.Errorf("doesn't support GetMany"),
}
return ch
}

func TestSync(t *testing.T) {
ctx := context.Background()
_, a, err := makeAPI(ctx)
if err != nil {
t.Fatal(err)
}

_, b, err := makeAPI(ctx)
if err != nil {
t.Fatal(err)
}

f := files.NewReaderFile("oh_hey", "oh_hey", ioutil.NopCloser(strings.NewReader("y"+strings.Repeat("o", 35000))), nil)
path, err := a.Unixfs().Add(ctx, f)
if err != nil {
t.Fatal(err)
}

aGetter := &manifest.NodeGetter{Dag: a.Dag()}
mfst, err := manifest.NewManifest(ctx, aGetter, path.Cid())
if err != nil {
t.Fatal(err)
}

bGetter := &manifest.NodeGetter{Dag: b.Dag()}
receive, err := NewReceive(ctx, bGetter, b.Block(), mfst)
if err != nil {
t.Fatal(err)
}

rem := &remote{
receive: receive,
lng: bGetter,
bapi: b.Block(),
}

send, err := NewSend(ctx, aGetter, mfst, rem)
if err != nil {
t.Fatal(err)
}

if err := send.Do(); err != nil {
t.Error(err)
}

// b should now be able to generate a manifest
_, err = manifest.NewManifest(ctx, bGetter, path.Cid())
if err != nil {
t.Error(err)
}
}

type remote struct {
receive *Receive
lng ipld.NodeGetter
bapi coreiface.BlockAPI
}

// Remotes must be "pushable"
func (r *remote) ReqSend(mfst *manifest.Manifest) (sid string, diff *manifest.Manifest, err error) {
ctx := context.Background()
r.receive, err = NewReceive(ctx, r.lng, r.bapi, mfst)
if err != nil {
return
}
sid = r.receive.sid
diff = r.receive.diff
return
}

// SendBlock
func (r *remote) SendBlock(sid, hash string, data []byte) Response {
return r.receive.ReceiveBlock(hash, bytes.NewReader(data))
}
Loading

0 comments on commit 6718e69

Please sign in to comment.