Skip to content

Commit

Permalink
Merge pull request ipfs/go-unixfs#19 from ipfs/features/speed-up-shar…
Browse files Browse the repository at this point in the history
…ding-fetch-4908

Use EnumerateChildrenAsync in for enumerating HAMT links

This commit was moved from ipfs/go-unixfs@2ae0707
  • Loading branch information
hannahhoward authored Oct 5, 2018
2 parents a9fc5f7 + 7a3f606 commit faac1ac
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 12 deletions.
93 changes: 82 additions & 11 deletions unixfs/ipld-merkledag/hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"fmt"
"os"
"sync"

bitfield "github.com/Stebalien/go-bitfield"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -104,7 +105,6 @@ func NewHamtFromDag(dserv ipld.DAGService, nd ipld.Node) (*Shard, error) {
return nil, err
}


if fsn.Type() != format.THAMTShard {
return nil, fmt.Errorf("node was not a dir shard")
}
Expand Down Expand Up @@ -202,6 +202,14 @@ func (sv *shardValue) Label() string {
return sv.key
}

func (ds *Shard) makeShardValue(lnk *ipld.Link) *shardValue {
lnk2 := *lnk
return &shardValue{
key: lnk.Name[ds.maxpadlen:],
val: &lnk2,
}
}

func hash(val []byte) []byte {
h := murmur3.New64()
h.Write(val)
Expand Down Expand Up @@ -253,6 +261,24 @@ func (ds *Shard) Find(ctx context.Context, name string) (*ipld.Link, error) {
return out, nil
}

type linkType int

const (
invalidLink linkType = iota
shardLink
shardValueLink
)

func (ds *Shard) childLinkType(lnk *ipld.Link) (linkType, error) {
if len(lnk.Name) < ds.maxpadlen {
return invalidLink, fmt.Errorf("invalid link name '%s'", lnk.Name)
}
if len(lnk.Name) == ds.maxpadlen {
return shardLink, nil
}
return shardValueLink, nil
}

// getChild returns the i'th child of this shard. If it is cached in the
// children array, it will return it from there. Otherwise, it loads the child
// node from disk.
Expand All @@ -277,12 +303,13 @@ func (ds *Shard) getChild(ctx context.Context, i int) (child, error) {
// as a 'child' interface
func (ds *Shard) loadChild(ctx context.Context, i int) (child, error) {
lnk := ds.nd.Links()[i]
if len(lnk.Name) < ds.maxpadlen {
return nil, fmt.Errorf("invalid link name '%s'", lnk.Name)
lnkLinkType, err := ds.childLinkType(lnk)
if err != nil {
return nil, err
}

var c child
if len(lnk.Name) == ds.maxpadlen {
if lnkLinkType == shardLink {
nd, err := lnk.GetNode(ctx, ds.dserv)
if err != nil {
return nil, err
Expand All @@ -294,11 +321,7 @@ func (ds *Shard) loadChild(ctx context.Context, i int) (child, error) {

c = cds
} else {
lnk2 := *lnk
c = &shardValue{
key: lnk.Name[ds.maxpadlen:],
val: &lnk2,
}
c = ds.makeShardValue(lnk)
}

ds.children[i] = c
Expand Down Expand Up @@ -383,10 +406,20 @@ func (ds *Shard) getValue(ctx context.Context, hv *hashBits, key string, cb func
// EnumLinks collects all links in the Shard.
func (ds *Shard) EnumLinks(ctx context.Context) ([]*ipld.Link, error) {
var links []*ipld.Link
err := ds.ForEachLink(ctx, func(l *ipld.Link) error {
links = append(links, l)
var setlk sync.Mutex

getLinks := makeAsyncTrieGetLinks(ds.dserv, func(sv *shardValue) error {
lnk := sv.val
lnk.Name = sv.key
setlk.Lock()
links = append(links, lnk)
setlk.Unlock()
return nil
})

cset := cid.NewSet()

err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit)
return links, err
}

Expand All @@ -400,6 +433,44 @@ func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) erro
})
}

// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync
// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called
// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation
func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(*shardValue) error) dag.GetLinks {

return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) {
node, err := dagService.Get(ctx, currentCid)
if err != nil {
return nil, err
}
directoryShard, err := NewHamtFromDag(dagService, node)
if err != nil {
return nil, err
}

childShards := make([]*ipld.Link, 0, len(directoryShard.children))
links := directoryShard.nd.Links()
for idx := range directoryShard.children {
lnk := links[idx]
lnkLinkType, err := directoryShard.childLinkType(lnk)

if err != nil {
return nil, err
}
if lnkLinkType == shardLink {
childShards = append(childShards, lnk)
} else {
sv := directoryShard.makeShardValue(lnk)
err := onShardValue(sv)
if err != nil {
return nil, err
}
}
}
return childShards, nil
}
}

func (ds *Shard) walkTrie(ctx context.Context, cb func(*shardValue) error) error {
for idx := range ds.children {
c, err := ds.getChild(ctx, idx)
Expand Down
14 changes: 13 additions & 1 deletion unixfs/ipld-merkledag/hamt/hamt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

dag "github.com/ipfs/go-merkledag"
mdtest "github.com/ipfs/go-merkledag/test"

ft "github.com/ipfs/go-unixfs"

ipld "github.com/ipfs/go-ipld-format"
Expand Down Expand Up @@ -100,6 +101,8 @@ func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
return fmt.Errorf("links arrays are different sizes")
}

sort.Stable(dag.LinkSlice(linksA))
sort.Stable(dag.LinkSlice(linksB))
for i, a := range linksA {
b := linksB[i]
if a.Name != b.Name {
Expand Down Expand Up @@ -280,14 +283,17 @@ func TestSetAfterMarshal(t *testing.T) {
t.Fatal(err)
}

empty := ft.EmptyDirNode()
for i := 0; i < 100; i++ {
empty := ft.EmptyDirNode()
err := nds.Set(ctx, fmt.Sprintf("moredirs%d", i), empty)
if err != nil {
t.Fatal(err)
}
}

nd, err = nds.Node()
nds, err = NewHamtFromDag(ds, nd)

links, err := nds.EnumLinks(ctx)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -319,6 +325,9 @@ func TestDuplicateAddShard(t *testing.T) {
t.Fatal(err)
}

node, err := dir.Node()
dir, err = NewHamtFromDag(ds, node)

lnks, err := dir.EnumLinks(ctx)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -411,6 +420,9 @@ func TestRemoveElemsAfterMarshal(t *testing.T) {
}
}

nd, err = nds.Node()
nds, err = NewHamtFromDag(ds, nd)

links, err := nds.EnumLinks(ctx)
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit faac1ac

Please sign in to comment.