Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PosInfo() method to blocks and merkeldag nodes. #3309

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions blocks/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,30 @@ import (
"errors"
"fmt"

"github.com/ipfs/go-ipfs/commands/files"

cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
mh "gx/ipfs/QmYDds3421prZgqKbLpEK7T9Aa2eVdQ7o3YarX1LVLdP2J/go-multihash"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)

var ErrWrongHash = errors.New("data did not match given hash!")

// Block is a singular block of data in ipfs

type Block interface {
Multihash() mh.Multihash
RawData() []byte
PosInfo() *files.PosInfo
Cid() *cid.Cid
String() string
Loggable() map[string]interface{}
}

// Block is a singular block of data in ipfs
type BasicBlock struct {
cid *cid.Cid
data []byte
cid *cid.Cid
data []byte
posInfo *files.PosInfo
}

// NewBlock creates a Block object from opaque data. It will hash the data.
Expand Down Expand Up @@ -59,6 +64,14 @@ func (b *BasicBlock) Cid() *cid.Cid {
return b.cid
}

func (b *BasicBlock) PosInfo() *files.PosInfo {
return b.posInfo
}

func (b *BasicBlock) SetPosInfo(posInfo *files.PosInfo) {
b.posInfo = posInfo
}

func (b *BasicBlock) String() string {
return fmt.Sprintf("[Block %s]", b.Cid())
}
Expand Down
11 changes: 11 additions & 0 deletions commands/files/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,14 @@ type SizeFile interface {

Size() (int64, error)
}

type FileInfo interface {
FullPath() string
Stat() os.FileInfo
}

type PosInfo struct {
Offset uint64
FullPath string
Stat os.FileInfo // can be nil
}
12 changes: 11 additions & 1 deletion core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,12 @@ func (adder *Adder) addFile(file files.File) error {
// progress updates to the client (over the output channel)
var reader io.Reader = file
if adder.Progress {
reader = &progressReader{file: file, out: adder.Out}
rdr := &progressReader{file: file, out: adder.Out}
if fi, ok := file.(files.FileInfo); ok {
reader = &progressReader2{rdr, fi}
} else {
reader = rdr
}
}

dagnode, err := adder.add(reader)
Expand Down Expand Up @@ -515,3 +520,8 @@ func (i *progressReader) Read(p []byte) (int, error) {

return n, err
}

type progressReader2 struct {
*progressReader
files.FileInfo
}
101 changes: 101 additions & 0 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"os"
"testing"
"time"

"github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/blockservice"
"github.com/ipfs/go-ipfs/commands/files"
"github.com/ipfs/go-ipfs/core"
dag "github.com/ipfs/go-ipfs/merkledag"
Expand Down Expand Up @@ -162,3 +167,99 @@ func TestAddGCLive(t *testing.T) {
t.Fatal(err)
}
}

func TestAddWPosInfo(t *testing.T) {
r := &repo.Mock{
C: config.Config{
Identity: config.Identity{
PeerID: "Qmfoo", // required by offline node
},
},
D: testutil.ThreadSafeCloserMapDatastore(),
}
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
if err != nil {
t.Fatal(err)
}

bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: "/tmp/foo.txt", t: t}
bserv := blockservice.New(bs, node.Exchange)
dserv := dag.NewDAGService(bserv)
adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv)
if err != nil {
t.Fatal(err)
}
adder.Out = make(chan interface{})
adder.Progress = true

data := make([]byte, 5*1024*1024)
rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error
fileData := ioutil.NopCloser(bytes.NewBuffer(data))
fileInfo := dummyFileInfo{"foo.txt", int64(len(data)), time.Now()}
file := files.NewReaderFile("foo.txt", "/tmp/foo.txt", fileData, &fileInfo)

go func() {
defer close(adder.Out)
err = adder.AddFile(file)
if err != nil {
t.Fatal(err)
}
}()
for _ = range adder.Out {}

if bs.countAtOffsetZero != 2 {
t.Fatal("expected 2 blocks with an offset at zero (one root, and one leaf), got %d", bs.countAtOffsetZero)
}
if bs.countAtOffsetNonZero != 19 {
// note: the exact number will depend on the size and the sharding algo. used
t.Fatal("expected 19 blocks with an offset > 0, got %d", bs.countAtOffsetNonZero)
}
}

type testBlockstore struct {
blockstore.GCBlockstore
expectedPath string
t *testing.T
countAtOffsetZero int
countAtOffsetNonZero int
}

func (bs *testBlockstore) Put(block blocks.Block) error {
bs.CheckForPosInfo(block)
return bs.GCBlockstore.Put(block)
}

func (bs *testBlockstore) PutMany(blocks []blocks.Block) error {
for _, blk := range blocks {
bs.CheckForPosInfo(blk)
}
return bs.GCBlockstore.PutMany(blocks)
}

func (bs *testBlockstore) CheckForPosInfo(block blocks.Block) error {
posInfo := block.PosInfo()
if posInfo != nil {
if posInfo.FullPath != bs.expectedPath {
bs.t.Fatal("PosInfo does not have the expected path")
}
if posInfo.Offset == 0 {
bs.countAtOffsetZero += 1
} else {
bs.countAtOffsetNonZero += 1
}
}
return nil
}

type dummyFileInfo struct {
name string
size int64
modTime time.Time
}

func (fi *dummyFileInfo) Name() string { return fi.name }
func (fi *dummyFileInfo) Size() int64 { return fi.size }
func (fi *dummyFileInfo) Mode() os.FileMode { return 0 }
func (fi *dummyFileInfo) ModTime() time.Time { return fi.modTime }
func (fi *dummyFileInfo) IsDir() bool { return false }
func (fi *dummyFileInfo) Sys() interface{} { return nil }
12 changes: 9 additions & 3 deletions importer/balanced/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
)

func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) {
var offset uint64 = 0
var root *h.UnixfsNode
for level := 0; !db.Done(); level++ {

nroot := h.NewUnixfsNode()
db.SetPosInfo(nroot, 0)

// add our old root as a child of the new root.
if root != nil { // nil if it's the first node.
Expand All @@ -21,11 +23,13 @@ func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) {
}

// fill it up.
if err := fillNodeRec(db, nroot, level); err != nil {
if err := fillNodeRec(db, nroot, level, offset); err != nil {
return nil, err
}

offset = nroot.FileSize()
root = nroot

}
if root == nil {
root = h.NewUnixfsNode()
Expand All @@ -49,7 +53,7 @@ func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) {
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int, offset uint64) error {
if depth < 0 {
return errors.New("attempt to fillNode at depth < 0")
}
Expand All @@ -62,14 +66,16 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
// while we have room AND we're not done
for node.NumChildren() < db.Maxlinks() && !db.Done() {
child := h.NewUnixfsNode()
db.SetPosInfo(child, offset)

if err := fillNodeRec(db, child, depth-1); err != nil {
if err := fillNodeRec(db, child, depth-1, offset); err != nil {
return err
}

if err := node.AddChild(child, db); err != nil {
return err
}
offset += child.FileSize()
}

return nil
Expand Down
10 changes: 8 additions & 2 deletions importer/chunk/rabin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
var IpfsRabinPoly = chunker.Pol(17437180132763653)

type Rabin struct {
r *chunker.Chunker
r *chunker.Chunker
reader io.Reader
}

func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin {
Expand All @@ -25,7 +26,8 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {
ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max)

return &Rabin{
r: ch,
r: ch,
reader: r,
}
}

Expand All @@ -37,3 +39,7 @@ func (r *Rabin) NextBytes() ([]byte, error) {

return ch.Data, nil
}

func (r *Rabin) Reader() io.Reader {
return r.reader
}
5 changes: 5 additions & 0 deletions importer/chunk/splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var log = logging.Logger("chunk")
var DefaultBlockSize int64 = 1024 * 256

type Splitter interface {
Reader() io.Reader
NextBytes() ([]byte, error)
}

Expand Down Expand Up @@ -77,3 +78,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {

return buf[:n], nil
}

func (ss *sizeSplitterv2) Reader() io.Reader {
return ss.r
}
Loading