diff --git a/actions/manifest.go b/actions/manifest.go index 5a65c35f3..862a14b1e 100644 --- a/actions/manifest.go +++ b/actions/manifest.go @@ -1,56 +1,41 @@ package actions import ( - "context" - + "github.com/qri-io/qri/base" "github.com/qri-io/qri/manifest" "github.com/qri-io/qri/p2p" - "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format" "gx/ipfs/QmUJYo4etAQqFfSS2rarFAE97eNGB8ej64YkRT2SmsYD4r/go-ipfs/core/coreapi" - coreiface "gx/ipfs/QmUJYo4etAQqFfSS2rarFAE97eNGB8ej64YkRT2SmsYD4r/go-ipfs/core/coreapi/interface" ) // NewManifest generates a manifest for a given node func NewManifest(node *p2p.QriNode, path string) (*manifest.Manifest, error) { - ipfsn, err := node.IPFSNode() + ng, err := newNodeGetter(node) if err != nil { return nil, err } - id, err := cid.Parse(path) + return base.NewManifest(node.Context(), ng, path) +} + +// Missing returns a manifest describing blocks that are not in this node for a given manifest +func Missing(node *p2p.QriNode, m *manifest.Manifest) (missing *manifest.Manifest, err error) { + ng, err := newNodeGetter(node) if err != nil { return nil, err } - ng := &nodeGetter{dag: coreapi.NewCoreAPI(ipfsn).Dag()} - return manifest.NewManifest(node.Context(), ng, id) + return base.Missing(node.Context(), ng, m) } -type nodeGetter struct { - dag coreiface.DagAPI -} - -// Get retrieves nodes by CID. Depending on the NodeGetter -// implementation, this may involve fetching the Node from a remote -// machine; consider setting a deadline in the context. -func (ng *nodeGetter) Get(ctx context.Context, id cid.Cid) (ipld.Node, error) { - path, err := coreiface.ParsePath(id.String()) +// newNodeGetter generates an ipld.NodeGetter from a QriNode +func newNodeGetter(node *p2p.QriNode) (ng ipld.NodeGetter, err error) { + ipfsn, err := node.IPFSNode() if err != nil { return nil, err } - return ng.dag.Get(ctx, path) -} -// GetMany returns a channel of NodeOptions given a set of CIDs. -func (ng *nodeGetter) GetMany(ctx context.Context, cids []cid.Cid) <-chan *ipld.NodeOption { - ch := make(chan *ipld.NodeOption) - go func() { - for _, id := range cids { - n, err := ng.Get(ctx, id) - ch <- &ipld.NodeOption{Err: err, Node: n} - } - }() - return ch + ng = &manifest.NodeGetter{Dag: coreapi.NewCoreAPI(ipfsn).Dag()} + return } diff --git a/base/manifest.go b/base/manifest.go new file mode 100644 index 000000000..8f43f8d3d --- /dev/null +++ b/base/manifest.go @@ -0,0 +1,38 @@ +package base + +import ( + "context" + + "github.com/qri-io/qri/manifest" + + "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" + ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format" +) + +// NewManifest generates a manifest for a given node +func NewManifest(ctx context.Context, ng ipld.NodeGetter, path string) (*manifest.Manifest, error) { + id, err := cid.Parse(path) + if err != nil { + return nil, err + } + + return manifest.NewManifest(ctx, ng, id) +} + +// Missing returns a manifest describing blocks that are not in this node for a given manifest +func Missing(ctx context.Context, ng ipld.NodeGetter, m *manifest.Manifest) (missing *manifest.Manifest, err error) { + var nodes []string + + for _, idstr := range m.Nodes { + id, err := cid.Parse(idstr) + if err != nil { + return nil, err + } + if _, err := ng.Get(ctx, id); err == ipld.ErrNotFound { + nodes = append(nodes, id.String()) + } else if err != nil { + return nil, err + } + } + return &manifest.Manifest{Nodes: nodes}, nil +} diff --git a/bsync/bsync.go b/bsync/bsync.go new file mode 100644 index 000000000..938ddf479 --- /dev/null +++ b/bsync/bsync.go @@ -0,0 +1,311 @@ +// Package bsync implements point-to-point block-syncing between a local and remote source +// it's like rsync, but specific to block based content addressed systems +package bsync + +import ( + "context" + "fmt" + "io" + "math/rand" + + "github.com/qri-io/qri/base" + "github.com/qri-io/qri/manifest" + + "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" + ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format" + coreiface "gx/ipfs/QmUJYo4etAQqFfSS2rarFAE97eNGB8ej64YkRT2SmsYD4r/go-ipfs/core/coreapi/interface" +) + +// Progress tracks completion of a sync task. each element in the slice represents +// a block, the element itself must be a number from 0-100 representing the +// transmission completion of that block. 0 = nothing sent, 100 = finished. +type Progress []uint16 + +// Percentage expressess the completion +func (p Progress) Percentage() (pct float32) { + for _, bl := range p { + pct += float32(bl) / float32(100) + } + return (pct / float32(len(p))) * 100 +} + +// CompletedBlocks returns the number of blocks that are completed +func (p Progress) CompletedBlocks() (count int) { + for _, bl := range p { + if bl == 100 { + count++ + } + } + return count +} + +// Complete returns weather progress is finished +func (p Progress) Complete() bool { + for _, bl := range p { + if bl != 100 { + return false + } + } + return true +} + +// Response defines the result of sending a block, or attempting to send a block +type Response struct { + Hash string + Status ResponseStatus + Err error +} + +// ResponseStatus defines types of results for a request +type ResponseStatus int + +const ( + // StatusErrored indicates the request failed and cannot be retried + StatusErrored ResponseStatus = -1 + // StatusOk indicates the request comp + StatusOk ResponseStatus = 0 + // StatusRetry indicates the request can be attempted again + StatusRetry ResponseStatus = 1 +) + +// Remote is an interface for a source that can be synced to +type Remote interface { + // Remotes must be "pushable" + ReqSend(mfst *manifest.Manifest) (sid string, diff *manifest.Manifest, err error) + // SendBlock + SendBlock(sid, hash string, data []byte) Response +} + +// Send coordinates sending a manifest to a receiver, tracking progress and state +type Send struct { + sid string // session ID for this push, generated by receiver + ctx context.Context // session context + mfst *manifest.Manifest // manifest we're sending + diff *manifest.Manifest // returned difference + lng ipld.NodeGetter // local NodeGetter (Block Getter) + remote Remote // place we're sending to + parallelism int // number of "tracks" for sending along + prog Progress // progress state + progCh chan Progress + blocksCh chan string + responses chan Response +} + +// sender is a parallizable, stateless struct that sends blocks +type sender struct { + sid string + ctx context.Context + blocksCh chan string + responses chan Response + stopCh chan bool + lng ipld.NodeGetter + remote Remote +} + +func (s sender) start() { + go func() { + for { + select { + case hash := <-s.blocksCh: + id, err := cid.Parse(hash) + if err != nil { + s.responses <- Response{ + Hash: hash, + Status: StatusErrored, + Err: err, + } + } + node, err := s.lng.Get(s.ctx, id) + if err != nil { + s.responses <- Response{ + Hash: hash, + Status: StatusErrored, + Err: err, + } + continue + } + s.responses <- s.remote.SendBlock(s.sid, hash, node.RawData()) + case <-s.stopCh: + return + case <-s.ctx.Done(): + return + } + } + + }() +} + +func (s sender) stop() { + s.stopCh <- true +} + +// NewSend gets a local path to a remote place using a local NodeGetter and a remote +func NewSend(ctx context.Context, lng ipld.NodeGetter, mfst *manifest.Manifest, remote Remote) (*Send, error) { + ps := &Send{ + mfst: mfst, + lng: lng, + remote: remote, + parallelism: 4, + blocksCh: make(chan string, 8), + progCh: make(chan Progress, 8), + responses: make(chan Response), + } + return ps, nil +} + +// Do executes the send +func (snd *Send) Do() (err error) { + snd.sid, snd.diff, err = snd.remote.ReqSend(snd.mfst) + if err != nil { + return err + } + + // fill in progress + snd.prog = make(Progress, len(snd.mfst.Nodes)) + for i := range snd.prog { + snd.prog[i] = 100 + } + + // then set missing blocks to + for i, hash := range snd.mfst.Nodes { + for _, missing := range snd.diff.Nodes { + if hash == missing { + snd.prog[i] = 0 + } + } + } + + go snd.updateProgress() + + // create senders + sends := make([]sender, snd.parallelism) + for i := 0; i <= snd.parallelism; i++ { + sends[i] = sender{ + sid: snd.sid, + ctx: snd.ctx, + blocksCh: snd.blocksCh, + responses: snd.responses, + lng: snd.lng, + remote: snd.remote, + } + sends[i].start() + } + + errCh := make(chan error) + + // receive block responses + go func(sends []sender, errCh chan error) { + for { + select { + case r := <-snd.responses: + switch r.Status { + case StatusOk: + // this is the only place we should modify progress after creation + for i, hash := range snd.mfst.Nodes { + if r.Hash == hash { + snd.prog[i] = 100 + } + } + go snd.updateProgress() + if snd.prog.Complete() { + errCh <- nil + return + } + case StatusErrored: + for _, s := range sends { + s.stop() + } + errCh <- r.Err + case StatusRetry: + snd.blocksCh <- r.Hash + } + } + } + }(sends, errCh) + + // fill queue with missing blocks to kick off the send + go func() { + for _, hash := range snd.diff.Nodes { + snd.blocksCh <- hash + } + }() + + // block until send on errCh + return <-errCh +} + +func (snd *Send) updateProgress() { + snd.progCh <- snd.prog +} + +// Receive tracks state of receiving a manifest of blocks from a remote +type Receive struct { + sid string + ctx context.Context + lng ipld.NodeGetter + dag coreiface.DagAPI + batch coreiface.DagBatch + mfst *manifest.Manifest + diff *manifest.Manifest + prog Progress + responses chan Response + pch chan Progress +} + +// NewReceive creates a receive state machine +func NewReceive(ctx context.Context, lng ipld.NodeGetter, dag coreiface.DagAPI, mfst *manifest.Manifest) (*Receive, error) { + diff, err := base.Missing(ctx, lng, mfst) + if err != nil { + return nil, err + } + + r := &Receive{ + sid: randStringBytesMask(10), + ctx: ctx, + lng: lng, + dag: dag, + batch: dag.Batch(ctx), + mfst: mfst, + diff: diff, + } + + return r, nil +} + +// ReceiveBlock accepts a block from the sender, placing it in the local blockstore +func (r *Receive) ReceiveBlock(hash string, data io.Reader) Response { + // TODO - check hash + // r.batch.Put(r.ctx, ) + + return Response{ + Hash: hash, + Status: StatusErrored, + Err: fmt.Errorf("not finished"), + } +} + +// the best stack overflow answer evaarrr: https://stackoverflow.com/a/22892986/9416066 +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" +const ( + letterIdxBits = 6 // 6 bits to represent a letter index + letterIdxMask = 1<= 0; { + if remain == 0 { + cache, remain = rand.Int63(), letterIdxMax + } + if idx := int(cache & letterIdxMask); idx < len(letterBytes) { + b[i] = letterBytes[idx] + i-- + } + cache >>= letterIdxBits + remain-- + } + + return string(b) +} diff --git a/bsync/bsync_test.go b/bsync/bsync_test.go new file mode 100644 index 000000000..69bdccfd1 --- /dev/null +++ b/bsync/bsync_test.go @@ -0,0 +1 @@ +package bsync diff --git a/cmd/manifest.go b/cmd/manifest.go index 6291cddac..d84e5d0ec 100644 --- a/cmd/manifest.go +++ b/cmd/manifest.go @@ -2,6 +2,8 @@ package cmd import ( "fmt" + "io/ioutil" + "path/filepath" "strings" "encoding/hex" @@ -20,20 +22,39 @@ func NewManifestCommand(f Factory, ioStreams ioes.IOStreams) *cobra.Command { cmd := &cobra.Command{ Use: "manifest", Hidden: true, - Short: "generate a qri manifest", - // Annotations: map[string]string{ - // "group": "dataset", - // }, + Short: "dataset manifest interation", + } + + get := &cobra.Command{ + Use: "get", + Short: "get one or more manifests for a given reference", + RunE: func(cmd *cobra.Command, args []string) error { + if err := o.Complete(f, args); err != nil { + return err + } + return o.Get() + }, + } + + missing := &cobra.Command{ + Use: "missing", + Short: "list blocks not present in this repo for a given manifest", RunE: func(cmd *cobra.Command, args []string) error { if err := o.Complete(f, args); err != nil { return err } - return o.Run() + return o.Missing() }, } - cmd.Flags().StringVarP(&o.Format, "format", "f", "json", "set output format [json, yaml, cbor]") - cmd.Flags().BoolVar(&o.Pretty, "pretty", false, "print output without indentation, only applies to json format") + get.Flags().StringVar(&o.Format, "format", "json", "set output format [json, yaml, cbor]") + get.Flags().BoolVar(&o.Pretty, "pretty", false, "print output without indentation, only applies to json format") + + missing.Flags().StringVar(&o.Format, "format", "json", "set output format [json, yaml, cbor]") + missing.Flags().BoolVar(&o.Pretty, "pretty", false, "print output without indentation, only applies to json format") + missing.Flags().StringVar(&o.File, "file", "", "manifest file") + + cmd.AddCommand(get, missing) return cmd } @@ -45,6 +66,7 @@ type ManifestOptions struct { Refs []string Format string Pretty bool + File string DatasetRequests *lib.DatasetRequests } @@ -56,8 +78,8 @@ func (o *ManifestOptions) Complete(f Factory, args []string) (err error) { return } -// Run executes the get command -func (o *ManifestOptions) Run() (err error) { +// Get executes the get command +func (o *ManifestOptions) Get() (err error) { mf := &manifest.Manifest{} for _, refstr := range o.Refs { if err = o.DatasetRequests.Manifest(&refstr, mf); err != nil { @@ -87,3 +109,61 @@ func (o *ManifestOptions) Run() (err error) { return err } + +// Missing executes the missing command +func (o *ManifestOptions) Missing() error { + if o.File == "" { + return fmt.Errorf("manifest file is required") + } + + in := &manifest.Manifest{} + data, err := ioutil.ReadFile(o.File) + if err != nil { + return err + } + + switch strings.ToLower(filepath.Ext(o.File)) { + case ".yaml": + err = yaml.Unmarshal(data, in) + case ".json": + err = json.Unmarshal(data, in) + case ".cbor": + // TODO - I'm not a fan of this hex tom-foolery + data, err = hex.DecodeString(string(data)) + if err != nil { + return err + } + in, err = manifest.UnmarshalCBOR(data) + } + + if err != nil { + return err + } + + mf := &manifest.Manifest{} + if err = o.DatasetRequests.ManifestMissing(in, mf); err != nil { + return err + } + + var buffer []byte + switch strings.ToLower(o.Format) { + case "json": + if !o.Pretty { + buffer, err = json.Marshal(mf) + } else { + buffer, err = json.MarshalIndent(mf, "", " ") + } + case "yaml": + buffer, err = yaml.Marshal(mf) + case "cbor": + var raw []byte + raw, err = mf.MarshalCBOR() + buffer = []byte(hex.EncodeToString(raw)) + } + if err != nil { + return fmt.Errorf("error getting config: %s", err) + } + _, err = o.Out.Write(buffer) + + return err +} diff --git a/lib/datasets.go b/lib/datasets.go index 0051bdcb6..fd7f2eb23 100644 --- a/lib/datasets.go +++ b/lib/datasets.go @@ -494,3 +494,18 @@ func (r *DatasetRequests) Manifest(refstr *string, m *manifest.Manifest) (err er *m = *mf return } + +// ManifestMissing generates a manifest of blocks that are not present on this repo for a given manifest +func (r *DatasetRequests) ManifestMissing(a, b *manifest.Manifest) (err error) { + if r.cli != nil { + return r.cli.Call("DatasetRequests.Manifest", a, b) + } + + var mf *manifest.Manifest + mf, err = actions.Missing(r.node, a) + if err != nil { + return + } + *b = *mf + return +} diff --git a/manifest/manifest.go b/manifest/manifest.go index a7f15cead..23562579f 100644 --- a/manifest/manifest.go +++ b/manifest/manifest.go @@ -24,11 +24,11 @@ type Node interface { // node identifiers are stored in a slice "nodes", all other slices reference // cids by index positions type Manifest struct { - Root int `json:"root"` // index if CID in nodes list this manifest is about. The subject of the manifest - Nodes []string `json:"nodes"` // list if CIDS contained in the root dag - Links [][2]int `json:"links"` // links between nodes - Sizes []uint64 `json:"sizes"` // sizes of nodes in bytes - Meta map[string][]int `json:"meta,omitempty"` // meta are lists of logical sub-DAGs by positions in the nodes list + Links [][2]int `json:"links"` // links between nodes + Sections map[string][]int `json:"sections,omitempty"` // sections are lists of logical sub-DAGs by positions in the nodes list + Nodes []string `json:"nodes"` // list if CIDS contained in the root dag + Root int `json:"root"` // index if CID in nodes list this manifest is about. The subject of the manifest + Sizes []uint64 `json:"sizes"` // sizes of nodes in bytes } // NewManifest generates a manifest from an ipld node diff --git a/manifest/node_getter.go b/manifest/node_getter.go new file mode 100644 index 000000000..cb76658bc --- /dev/null +++ b/manifest/node_getter.go @@ -0,0 +1,37 @@ +package manifest + +import ( + "context" + + "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" + ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format" + coreiface "gx/ipfs/QmUJYo4etAQqFfSS2rarFAE97eNGB8ej64YkRT2SmsYD4r/go-ipfs/core/coreapi/interface" +) + +// NodeGetter wraps the go-ipfs DagAPI to satistfy the IPLD NodeGetter interface +type NodeGetter struct { + Dag coreiface.DagAPI +} + +// Get retrieves nodes by CID. Depending on the NodeGetter +// implementation, this may involve fetching the Node from a remote +// machine; consider setting a deadline in the context. +func (ng *NodeGetter) Get(ctx context.Context, id cid.Cid) (ipld.Node, error) { + path, err := coreiface.ParsePath(id.String()) + if err != nil { + return nil, err + } + return ng.Dag.Get(ctx, path) +} + +// GetMany returns a channel of NodeOptions given a set of CIDs. +func (ng *NodeGetter) GetMany(ctx context.Context, cids []cid.Cid) <-chan *ipld.NodeOption { + ch := make(chan *ipld.NodeOption) + go func() { + for _, id := range cids { + n, err := ng.Get(ctx, id) + ch <- &ipld.NodeOption{Err: err, Node: n} + } + }() + return ch +}