Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: proper tsize encoding in sharded files #65

Merged
merged 3 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions data/builder/dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package builder
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
rvagg marked this conversation as resolved.
Show resolved Hide resolved
"github.com/ipfs/go-unixfsnode"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
Expand All @@ -19,11 +22,7 @@ func mkEntries(cnt int, ls *ipld.LinkSystem) ([]dagpb.PBLink, error) {
entries := make([]dagpb.PBLink, 0, cnt)
for i := 0; i < cnt; i++ {
r := bytes.NewBufferString(fmt.Sprintf("%d", i))
f, s, err := BuildUnixFSFile(r, "", ls)
if err != nil {
return nil, err
}
e, err := BuildUnixFSDirectoryEntry(fmt.Sprintf("file %d", i), int64(s), f)
e, err := mkEntry(r, fmt.Sprintf("file %d", i), ls)
if err != nil {
return nil, err
}
Expand All @@ -32,6 +31,42 @@ func mkEntries(cnt int, ls *ipld.LinkSystem) ([]dagpb.PBLink, error) {
return entries, nil
}

func mkEntry(r io.Reader, name string, ls *ipld.LinkSystem) (dagpb.PBLink, error) {
f, s, err := BuildUnixFSFile(r, "", ls)
if err != nil {
return nil, err
}
return BuildUnixFSDirectoryEntry(name, int64(s), f)
}

func TestBuildUnixFSFileWrappedInDirectory_Reference(t *testing.T) {
for _, tc := range referenceTestCases {
t.Run(strconv.Itoa(tc.size), func(t *testing.T) {
buf := make([]byte, tc.size)
u.NewSeededRand(0xdeadbeef).Read(buf)
r := bytes.NewReader(buf)

ls := cidlink.DefaultLinkSystem()
storage := cidlink.Memory{}
ls.StorageReadOpener = storage.OpenRead
ls.StorageWriteOpener = storage.OpenWrite

e, err := mkEntry(r, fmt.Sprintf("%d", tc.size), &ls)
require.NoError(t, err)
d, sz, err := BuildUnixFSDirectory([]dagpb.PBLink{e}, &ls)
require.NoError(t, err)
require.Equal(t, tc.wrappedExpected.String(), d.(cidlink.Link).Cid.String())

// check sz is the stored size of all blocks in the generated DAG
var totStored int
for _, blk := range storage.Bag {
totStored += len(blk)
}
require.Equal(t, totStored, int(sz))
})
}
}

func TestBuildUnixFSDirectory(t *testing.T) {
ls := cidlink.DefaultLinkSystem()
storage := cidlink.Memory{}
Expand Down
173 changes: 112 additions & 61 deletions data/builder/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-unixfsnode/data"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/multiformats/go-multicodec"
Expand All @@ -18,6 +19,38 @@ import (
_ "github.com/ipld/go-ipld-prime/codec/raw"
)

type fileShardMeta struct {
link datamodel.Link
byteSize uint64
storedSize uint64
}

type fileShards []fileShardMeta

func (fs fileShards) totalByteSize() uint64 {
var total uint64
for _, f := range fs {
total += f.byteSize
}
return total
}

func (fs fileShards) totalStoredSize() uint64 {
var total uint64
for _, f := range fs {
total += f.storedSize
}
return total
}

func (fs fileShards) byteSizes() []uint64 {
sizes := make([]uint64, len(fs))
for i, f := range fs {
sizes[i] = f.byteSize
}
return sizes
}

// BuildUnixFSFile creates a dag of ipld Nodes representing file data.
// This recreates the functionality previously found in
// github.com/ipfs/go-unixfs/importer/balanced, but tailored to the
Expand All @@ -28,31 +61,29 @@ import (
// data nodes are stored as raw bytes.
// ref: https://github.com/ipfs/go-mfs/blob/1b1fd06cff048caabeddb02d4dbf22d2274c7971/file.go#L50
func BuildUnixFSFile(r io.Reader, chunker string, ls *ipld.LinkSystem) (ipld.Link, uint64, error) {
s, err := chunk.FromString(r, chunker)
src, err := chunk.FromString(r, chunker)
if err != nil {
return nil, 0, err
}

var prev []ipld.Link
var prevLen []uint64
var prev fileShards
depth := 1
for {
root, size, err := fileTreeRecursive(depth, prev, prevLen, s, ls)
next, err := fileTreeRecursive(depth, prev, src, ls)
if err != nil {
return nil, 0, err
}

if prev != nil && prev[0] == root {
if root == nil {
if prev != nil && prev[0].link == next.link {
if next.link == nil {
node := basicnode.NewBytes([]byte{})
link, err := ls.Store(ipld.LinkContext{}, leafLinkProto, node)
return link, 0, err
}
return root, size, nil
return next.link, next.storedSize, nil
}

prev = []ipld.Link{root}
prevLen = []uint64{size}
prev = []fileShardMeta{next}
depth++
}
}
Expand All @@ -75,102 +106,122 @@ var leafLinkProto = cidlink.LinkPrototype{
},
}

func fileTreeRecursive(depth int, children []ipld.Link, childLen []uint64, src chunk.Splitter, ls *ipld.LinkSystem) (ipld.Link, uint64, error) {
if depth == 1 && len(children) > 0 {
return nil, 0, fmt.Errorf("leaf nodes cannot have children")
} else if depth == 1 {
// fileTreeRecursive packs a file into chunks recursively, returning a root for
// this level of recursion, the number of file bytes consumed for this level of
// recursion and and the number of bytes used to store this level of recursion.
func fileTreeRecursive(
depth int,
children fileShards,
src chunk.Splitter,
ls *ipld.LinkSystem,
) (fileShardMeta, error) {
if depth == 1 {
// file leaf, next chunk, encode as raw bytes, store and retuen
if len(children) > 0 {
return fileShardMeta{}, fmt.Errorf("leaf nodes cannot have children")
}
leaf, err := src.NextBytes()
if err == io.EOF {
return nil, 0, nil
} else if err != nil {
return nil, 0, err
if err != nil {
if err == io.EOF {
return fileShardMeta{}, nil
}
return fileShardMeta{}, err
}
node := basicnode.NewBytes(leaf)
return sizedStore(ls, leafLinkProto, node)
l, sz, err := sizedStore(ls, leafLinkProto, node)
if err != nil {
return fileShardMeta{}, err
}
return fileShardMeta{link: l, byteSize: uint64(len(leaf)), storedSize: sz}, nil
}
// depth > 1.
totalSize := uint64(0)
blksizes := make([]uint64, 0, DefaultLinksPerBlock)

// depth > 1

if children == nil {
children = make([]ipld.Link, 0)
} else {
for i := range children {
blksizes = append(blksizes, childLen[i])
totalSize += childLen[i]
}
children = make(fileShards, 0)
}

// fill up the links for this level, if we need to go beyond
// DefaultLinksPerBlock we'll end up back here making a parallel tree
for len(children) < DefaultLinksPerBlock {
nxt, sz, err := fileTreeRecursive(depth-1, nil, nil, src, ls)
// descend down toward the leaves
next, err := fileTreeRecursive(depth-1, nil, src, ls)
if err != nil {
return nil, 0, err
} else if nxt == nil {
// eof
return fileShardMeta{}, err
} else if next.link == nil { // eof
break
}
totalSize += sz
children = append(children, nxt)
childLen = append(childLen, sz)
blksizes = append(blksizes, sz)
children = append(children, next)
}

if len(children) == 0 {
// empty case.
return nil, 0, nil
// empty case
return fileShardMeta{}, nil
} else if len(children) == 1 {
// degenerate case
return children[0], childLen[0], nil
return children[0], nil
}

// make the unixfs node.
// make the unixfs node
node, err := BuildUnixFS(func(b *Builder) {
FileSize(b, totalSize)
BlockSizes(b, blksizes)
FileSize(b, children.totalByteSize())
BlockSizes(b, children.byteSizes())
})
if err != nil {
return nil, 0, err
return fileShardMeta{}, err
}
pbn, err := packFileChildren(node, children)
if err != nil {
return fileShardMeta{}, err
}

// Pack into the dagpb node.
link, sz, err := sizedStore(ls, fileLinkProto, pbn)
if err != nil {
return fileShardMeta{}, err
}
return fileShardMeta{
link: link,
byteSize: children.totalByteSize(),
storedSize: children.totalStoredSize() + sz,
}, nil
}

func packFileChildren(node data.UnixFSData, children fileShards) (datamodel.Node, error) {
dpbb := dagpb.Type.PBNode.NewBuilder()
pbm, err := dpbb.BeginMap(2)
if err != nil {
return nil, 0, err
return nil, err
}
pblb, err := pbm.AssembleEntry("Links")
if err != nil {
return nil, 0, err
return nil, err
}
pbl, err := pblb.BeginList(int64(len(children)))
if err != nil {
return nil, 0, err
return nil, err
}
for i, c := range children {
pbln, err := BuildUnixFSDirectoryEntry("", int64(blksizes[i]), c)
for _, c := range children {
pbln, err := BuildUnixFSDirectoryEntry("", int64(c.storedSize), c.link)
if err != nil {
return nil, 0, err
return nil, err
}
if err = pbl.AssembleValue().AssignNode(pbln); err != nil {
return nil, 0, err
return nil, err
}
}
if err = pbl.Finish(); err != nil {
return nil, 0, err
return nil, err
}
if err = pbm.AssembleKey().AssignString("Data"); err != nil {
return nil, 0, err
return nil, err
}
if err = pbm.AssembleValue().AssignBytes(data.EncodeUnixFSData(node)); err != nil {
return nil, 0, err
return nil, err
}
if err = pbm.Finish(); err != nil {
return nil, 0, err
}
pbn := dpbb.Build()

link, _, err := sizedStore(ls, fileLinkProto, pbn)
if err != nil {
return nil, 0, err
return nil, err
}
return link, totalSize, nil
return dpbb.Build(), nil
}

// BuildUnixFSDirectoryEntry creates the link to a file or directory as it appears within a unixfs directory.
Expand Down
Loading