Skip to content

Commit

Permalink
Refactor ipfs get
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: rht <rhtbot@gmail.com>
  • Loading branch information
rht committed Aug 11, 2015
1 parent f105ce4 commit 45f99bd
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 103 deletions.
57 changes: 8 additions & 49 deletions core/commands/get.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package commands

import (
"bufio"
"compress/gzip"
"errors"
"fmt"
Expand All @@ -11,13 +10,11 @@ import (
"strings"

"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
path "github.com/ipfs/go-ipfs/path"
tar "github.com/ipfs/go-ipfs/thirdparty/tar"
uio "github.com/ipfs/go-ipfs/unixfs/io"
utar "github.com/ipfs/go-ipfs/unixfs/tar"
)

Expand Down Expand Up @@ -64,15 +61,16 @@ may also specify the level of compression by specifying '-l=<1-9>'.
res.SetError(err, cmds.ErrNormal)
return
}

p := path.Path(req.Arguments()[0])
var reader io.Reader
if archive, _, _ := req.Option("archive").Bool(); !archive && cmplvl != gzip.NoCompression {
// only use this when the flag is '-C' without '-a'
reader, err = getZip(req.Context(), node, p, cmplvl)
} else {
reader, err = get(req.Context(), node, p, cmplvl)
ctx := req.Context()
dn, err := core.Resolve(ctx, node, p)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

archive, _, _ := req.Option("archive").Bool()
reader, err := utar.DagArchive(ctx, dn, p.String(), node.DAG, archive, cmplvl)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
Expand Down Expand Up @@ -192,42 +190,3 @@ func getCompressOptions(req cmds.Request) (int, error) {
}
return gzip.NoCompression, nil
}

func get(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) {
dn, err := core.Resolve(ctx, node, p)
if err != nil {
return nil, err
}

return utar.DagArchive(ctx, dn, p.String(), node.DAG, compression)
}

// getZip is equivalent to `ipfs getdag $hash | gzip`
func getZip(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) {
dagnode, err := core.Resolve(ctx, node, p)
if err != nil {
return nil, err
}

reader, err := uio.NewDagReader(ctx, dagnode, node.DAG)
if err != nil {
return nil, err
}

pr, pw := io.Pipe()
gw, err := gzip.NewWriterLevel(pw, compression)
if err != nil {
return nil, err
}
bufin := bufio.NewReader(reader)
go func() {
_, err := bufin.WriteTo(gw)
if err != nil {
log.Error("Fail to compress the stream")
}
gw.Close()
pw.Close()
}()

return pr, nil
}
2 changes: 1 addition & 1 deletion fuse/readonly/readonly_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (s *Node) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadR
return err
}

buf := resp.Data[:min(req.Size, int(r.Size()-req.Offset))]
buf := resp.Data[:min(req.Size, int(int64(r.Size())-req.Offset))]
n, err := io.ReadFull(r, buf)
if err != nil && err != io.EOF {
return err
Expand Down
4 changes: 2 additions & 2 deletions unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
}

// Size return the total length of the data from the DAG structured file.
func (dr *DagReader) Size() int64 {
return int64(dr.pbdata.GetFilesize())
func (dr *DagReader) Size() uint64 {
return dr.pbdata.GetFilesize()
}

// Read reads data from the DAG structured file
Expand Down
111 changes: 60 additions & 51 deletions unixfs/tar/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
// TODO: does this need to be configurable?
var DefaultBufSize = 1048576

func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, compression int) (io.Reader, error) {
// DagArchive is equivalent to `ipfs getdag $hash | maybe_tar | maybe_gzip`
func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, archive bool, compression int) (io.Reader, error) {

_, filename := path.Split(name)

Expand All @@ -31,15 +32,14 @@ func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService
// use a buffered writer to parallelize task
bufw := bufio.NewWriterSize(pipew, DefaultBufSize)

// construct the tar writer
w, err := NewWriter(bufw, dag, compression)
w, err := NewWriter(ctx, dag, archive, compression, bufw)
if err != nil {
return nil, err
}

// write all the nodes recursively
go func() {
if err := w.WriteNode(ctx, nd, filename); err != nil {
if err := w.WriteNode(nd, filename); err != nil {
pipew.CloseWithError(err)
return
}
Expand All @@ -49,6 +49,7 @@ func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService
return
}

w.Close()
pipew.Close() // everything seems to be ok.
}()

Expand All @@ -61,93 +62,101 @@ func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService
type Writer struct {
Dag mdag.DAGService
TarW *tar.Writer
GzW *gzip.Writer
ctx cxt.Context

Close func() error
WriteFile func(*uio.DagReader, string) error
HandleDir func(string) error
}

// NewWriter wraps given io.Writer.
// compression determines whether to use gzip compression.
func NewWriter(w io.Writer, dag mdag.DAGService, compression int) (*Writer, error) {

if compression != gzip.NoCompression {
var err error
w, err = gzip.NewWriterLevel(w, compression)
if err != nil {
return nil, err
}
func NewWriter(ctx cxt.Context, dag mdag.DAGService, archive bool, compression int, iw io.Writer) (*Writer, error) {
gzw, err := gzip.NewWriterLevel(iw, compression)
if err != nil {
return nil, err
}

return &Writer{
Dag: dag,
TarW: tar.NewWriter(w),
}, nil
}

func (w *Writer) WriteDir(ctx cxt.Context, nd *mdag.Node, fpath string) error {
if err := writeDirHeader(w.TarW, fpath); err != nil {
return err
w := &Writer{
Dag: dag,
GzW: gzw,
ctx: ctx,
}

for i, ng := range w.Dag.GetDAG(ctx, nd) {
child, err := ng.Get(ctx)
if err != nil {
if !archive && compression != gzip.NoCompression {
// gz only case
w.WriteFile = func(dagr *uio.DagReader, fpath string) error {
_, err := dagr.WriteTo(w.GzW)
return err
}

npath := path.Join(fpath, nd.Links[i].Name)
if err := w.WriteNode(ctx, child, npath); err != nil {
w.HandleDir = func(fpath string) error {
return uio.ErrIsDir
}
w.Close = w.GzW.Close
} else {
// tar and maybe gz case
// construct the tar writer
w.TarW = tar.NewWriter(w.GzW)

w.WriteFile = func(dagr *uio.DagReader, fpath string) error {
if err := writeFileHeader(w.TarW, fpath, dagr.Size()); err != nil {
return err
}
_, err := dagr.WriteTo(w.TarW)
return err
}
}

return nil
}
w.HandleDir = func(fpath string) error {
return writeDirHeader(w.TarW, fpath)
}

func (w *Writer) WriteFile(ctx cxt.Context, nd *mdag.Node, fpath string) error {
pb := new(upb.Data)
if err := proto.Unmarshal(nd.Data, pb); err != nil {
return err
w.Close = w.TarW.Close
}

return w.writeFile(ctx, nd, pb, fpath)
return w, nil
}

func (w *Writer) writeFile(ctx cxt.Context, nd *mdag.Node, pb *upb.Data, fpath string) error {
if err := writeFileHeader(w.TarW, fpath, pb.GetFilesize()); err != nil {
func (w *Writer) writeDir(nd *mdag.Node, fpath string) error {
if err := w.HandleDir(fpath); err != nil {
return err
}

dagr, err := uio.NewDagReader(ctx, nd, w.Dag)
if err != nil {
return err
}
for i, ng := range w.Dag.GetDAG(w.ctx, nd) {
child, err := ng.Get(w.ctx)
if err != nil {
return err
}

_, err = io.Copy(w.TarW, dagr)
if err != nil && err != io.EOF {
return err
npath := path.Join(fpath, nd.Links[i].Name)
if err := w.WriteNode(child, npath); err != nil {
return err
}
}

return nil
}

func (w *Writer) WriteNode(ctx cxt.Context, nd *mdag.Node, fpath string) error {
func (w *Writer) WriteNode(nd *mdag.Node, fpath string) error {
pb := new(upb.Data)
if err := proto.Unmarshal(nd.Data, pb); err != nil {
return err
}

switch pb.GetType() {
case upb.Data_Directory:
return w.WriteDir(ctx, nd, fpath)
return w.writeDir(nd, fpath)
case upb.Data_File:
return w.writeFile(ctx, nd, pb, fpath)
dagr, err := uio.NewDagReader(w.ctx, nd, w.Dag)
if err != nil {
return err
}
return w.WriteFile(dagr, fpath)
default:
return fmt.Errorf("unixfs type not supported: %s", pb.GetType())
}
}

func (w *Writer) Close() error {
return w.TarW.Close()
}

func writeDirHeader(w *tar.Writer, fpath string) error {
return w.WriteHeader(&tar.Header{
Name: fpath,
Expand Down

0 comments on commit 45f99bd

Please sign in to comment.