Skip to content

Commit

Permalink
feat(bsync): initial Receivers implementation, HTTP support
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Dec 5, 2018
1 parent a3783a1 commit 2e986f6
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 106 deletions.
19 changes: 13 additions & 6 deletions bsync/bsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ const (

// Remote is an interface for a source that can be synced to
type Remote interface {
// Remotes must be "pushable"
ReqSend(mfst *manifest.Manifest) (sid string, diff *manifest.Manifest, err error)
// SendBlock
SendBlock(sid, hash string, data []byte) Response
// ReqSession requests a new session from the remote, which will return a
// delta manifest of blocks the remote needs and a session id that must
// be sent with each block
ReqSession(mfst *manifest.Manifest) (sid string, diff *manifest.Manifest, err error)
// PutBlock places a block on the remote
PutBlock(sid, hash string, data []byte) Response
}

// Send coordinates sending a manifest to a receiver, tracking progress and state
Expand Down Expand Up @@ -82,7 +84,7 @@ func NewSend(ctx context.Context, lng ipld.NodeGetter, mfst *manifest.Manifest,

// Do executes the send, blocking until complete
func (snd *Send) Do() (err error) {
snd.sid, snd.diff, err = snd.remote.ReqSend(snd.mfst)
snd.sid, snd.diff, err = snd.remote.ReqSession(snd.mfst)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +191,7 @@ func (s sender) start() {
}
continue
}
s.responses <- s.remote.SendBlock(s.sid, hash, node.RawData())
s.responses <- s.remote.PutBlock(s.sid, hash, node.RawData())
case <-s.stopCh:
return
case <-s.ctx.Done():
Expand Down Expand Up @@ -274,6 +276,11 @@ func (r *Receive) ReceiveBlock(hash string, data io.Reader) Response {
}
}

// Complete returns if this receive session is finished or not
func (r *Receive) Complete() bool {
return r.prog.Complete()
}

func (r *Receive) completionChanged() {
r.progCh <- r.prog
}
Expand Down
127 changes: 27 additions & 100 deletions bsync/bsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,104 +14,8 @@ import (
ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format"
coreiface "gx/ipfs/QmUJYo4etAQqFfSS2rarFAE97eNGB8ej64YkRT2SmsYD4r/go-ipfs/core/coreapi/interface"
files "gx/ipfs/QmZMWMvWMVKCbHetJ4RgndbuEF1io2UpUxwQwtNjtYPzSC/go-ipfs-files"
// coreopt "gx/ipfs/QmUJYo4etAQqFfSS2rarFAE97eNGB8ej64YkRT2SmsYD4r/go-ipfs/core/coreapi/interface/options"
)

// type node struct {
// cid *cid.Cid
// size uint64
// links []*node
// }

// func (n node) String() string { return n.cid.String() }
// func (n node) Cid() cid.Cid { return *n.cid }
// func (n node) Size() (uint64, error) { return n.size, nil }
// func (n node) Links() (links []*ipld.Link) {
// for _, l := range n.links {
// links = append(links, &ipld.Link{
// Size: l.size,
// Cid: l.Cid(),
// })
// }
// return
// }

// // Not needed for manifest test:
// func (n node) Loggable() map[string]interface{} { return nil }
// func (n node) Copy() ipld.Node { return nil }
// func (n node) RawData() []byte { return nil }
// func (n node) Resolve(path []string) (interface{}, []string, error) { return nil, nil, nil }
// func (n node) ResolveLink(path []string) (*ipld.Link, []string, error) { return nil, nil, nil }
// func (n node) Stat() (*ipld.NodeStat, error) { return nil, nil }
// func (n node) Tree(path string, depth int) []string { return nil }

// func NewGraph(layers []layer) (list []ipld.Node) {
// root := newNode(2 * kb)
// list = append(list, root)
// insert(root, layers, &list)
// return
// }

// func insert(n *node, layers []layer, list *[]ipld.Node) {
// if len(layers) > 0 {
// for i := 0; i < layers[0].numChildren; i++ {
// ch := newNode(layers[0].size)
// n.links = append(n.links, ch)
// *list = append(*list, ch)
// insert(ch, layers[1:], list)
// }
// }
// }

// // monotonic content counter for unique, consistent cids
// var content = 0

// func newNode(size uint64) *node {
// // Create a cid manually by specifying the 'prefix' parameters
// pref := cid.Prefix{
// Version: 1,
// Codec: cid.Raw,
// MhType: multihash.SHA2_256,
// MhLength: -1, // default length
// }

// // And then feed it some data
// c, err := pref.Sum([]byte(strconv.Itoa(content)))
// if err != nil {
// panic(err)
// }

// content++
// return &node{
// cid: &c,
// size: size,
// }
// }

type TestNodeGetter struct {
Nodes []ipld.Node
}

var _ ipld.NodeGetter = (*TestNodeGetter)(nil)

func (ng TestNodeGetter) Get(_ context.Context, id cid.Cid) (ipld.Node, error) {
for _, node := range ng.Nodes {
if id.Equals(node.Cid()) {
return node, nil
}
}
return nil, fmt.Errorf("cid not found: %s", id.String())
}

// GetMany returns a channel of NodeOptions given a set of CIDs.
func (ng TestNodeGetter) GetMany(context.Context, []cid.Cid) <-chan *ipld.NodeOption {
ch := make(chan *ipld.NodeOption)
ch <- &ipld.NodeOption{
Err: fmt.Errorf("doesn't support GetMany"),
}
return ch
}

func TestSync(t *testing.T) {
ctx := context.Background()
_, a, err := makeAPI(ctx)
Expand Down Expand Up @@ -164,14 +68,38 @@ func TestSync(t *testing.T) {
}
}

type TestNodeGetter struct {
Nodes []ipld.Node
}

var _ ipld.NodeGetter = (*TestNodeGetter)(nil)

func (ng TestNodeGetter) Get(_ context.Context, id cid.Cid) (ipld.Node, error) {
for _, node := range ng.Nodes {
if id.Equals(node.Cid()) {
return node, nil
}
}
return nil, fmt.Errorf("cid not found: %s", id.String())
}

// GetMany returns a channel of NodeOptions given a set of CIDs.
func (ng TestNodeGetter) GetMany(context.Context, []cid.Cid) <-chan *ipld.NodeOption {
ch := make(chan *ipld.NodeOption)
ch <- &ipld.NodeOption{
Err: fmt.Errorf("doesn't support GetMany"),
}
return ch
}

// remote implements the Remote interface on a single receive session at a time
type remote struct {
receive *Receive
lng ipld.NodeGetter
bapi coreiface.BlockAPI
}

// Remotes must be "pushable"
func (r *remote) ReqSend(mfst *manifest.Manifest) (sid string, diff *manifest.Manifest, err error) {
func (r *remote) ReqSession(mfst *manifest.Manifest) (sid string, diff *manifest.Manifest, err error) {
ctx := context.Background()
r.receive, err = NewReceive(ctx, r.lng, r.bapi, mfst)
if err != nil {
Expand All @@ -182,7 +110,6 @@ func (r *remote) ReqSend(mfst *manifest.Manifest) (sid string, diff *manifest.Ma
return
}

// SendBlock
func (r *remote) SendBlock(sid, hash string, data []byte) Response {
func (r *remote) PutBlock(sid, hash string, data []byte) Response {
return r.receive.ReceiveBlock(hash, bytes.NewReader(data))
}
80 changes: 80 additions & 0 deletions bsync/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package bsync

import (
"bytes"
"encoding/json"
"fmt"
"net/http"

"github.com/qri-io/qri/manifest"
)

// HTTPRemote implents the Remote interface via HTTP POST requests
type HTTPRemote struct {
URL string
}

// ReqSend initiates a send session
func (rem *HTTPRemote) ReqSend(mfst *manifest.Manifest) (sid string, diff *manifest.Manifest, err error) {
buf := &bytes.Buffer{}
if err = json.NewEncoder(buf).Encode(mfst); err != nil {
return
}

req, err := http.NewRequest("POST", rem.URL, buf)
if err != nil {
return
}
req.Header.Set("Content-Type", "application/json")

res, err := http.DefaultClient.Do(req)
if err != nil {
return
}

if res.StatusCode != http.StatusOK {
err = fmt.Errorf("remote repsonse: %d", res.StatusCode)
return
}

sid = res.Header.Get("sid")
diff = &manifest.Manifest{}
err = json.NewDecoder(res.Body).Decode(diff)

return
}

// SendBlock sends a block over HTTP to a remote source
func (rem *HTTPRemote) SendBlock(sid, hash string, data []byte) Response {
url := fmt.Sprintf("%s?sid=%s&hash=%s", rem.URL, sid, hash)
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(data))
if err != nil {
return Response{
Hash: hash,
Status: StatusErrored,
Err: err,
}
}
req.Header.Set("Content-Type", "application/octet-stream")

res, err := http.DefaultClient.Do(req)
if err != nil {
return Response{
Hash: hash,
Status: StatusErrored,
Err: err,
}
}

if res.StatusCode != http.StatusOK {
return Response{
Hash: hash,
Status: StatusRetry,
}
}

return Response{
Hash: hash,
Status: StatusOk,
}
}
Loading

0 comments on commit 2e986f6

Please sign in to comment.