From 8fc425787694c48c0a9542e7814afa5cb0f7b5d2 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 7 Jan 2017 05:46:17 -0800 Subject: [PATCH 1/3] rewrite enumerate children async to be less fragile License: MIT Signed-off-by: Jeromy --- merkledag/merkledag.go | 143 +++++++++++++++++------------------- merkledag/merkledag_test.go | 43 +++++++++++ 2 files changed, 109 insertions(+), 77 deletions(-) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 50ea3443868..5ab7daebe49 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -389,103 +389,92 @@ func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit return nil } -func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error { - toprocess := make(chan []*cid.Cid, 8) - nodes := make(chan *NodeOption, 8) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - defer close(toprocess) +// FetchGraphConcurrency is total number of concurrent fetches that +// 'fetchNodes' will start at a time +var FetchGraphConcurrency = 8 - go fetchNodes(ctx, ds, toprocess, nodes) +func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error { + if !visit(c) { + return nil + } root, err := ds.Get(ctx, c) if err != nil { return err } - nodes <- &NodeOption{Node: root} - live := 1 - - for { - select { - case opt, ok := <-nodes: - if !ok { - return nil - } - - if opt.Err != nil { - return opt.Err - } - - nd := opt.Node - - // a node has been fetched - live-- - - var cids []*cid.Cid - for _, lnk := range nd.Links() { - c := lnk.Cid - if visit(c) { - live++ - cids = append(cids, c) + feed := make(chan node.Node) + out := make(chan *NodeOption) + done := make(chan struct{}) + + var setlk sync.Mutex + + for i := 0; i < FetchGraphConcurrency; i++ { + go func() { + for n := range feed { + links := n.Links() + cids := make([]*cid.Cid, 0, len(links)) + for _, l := range links { + setlk.Lock() + unseen := visit(l.Cid) + setlk.Unlock() + if unseen { + cids = append(cids, l.Cid) + } } - } - - if live == 0 { - return nil - } - if len(cids) > 0 { + for nopt := range ds.GetMany(ctx, cids) { + select { + case out <- nopt: + case <-ctx.Done(): + return + } + } select { - case toprocess <- cids: + case done <- struct{}{}: case <-ctx.Done(): - return ctx.Err() } } - case <-ctx.Done(): - return ctx.Err() - } + }() } -} + defer close(feed) -// FetchGraphConcurrency is total number of concurrenct fetches that -// 'fetchNodes' will start at a time -var FetchGraphConcurrency = 8 - -func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out chan<- *NodeOption) { - var wg sync.WaitGroup - defer func() { - // wait for all 'get' calls to complete so we don't accidentally send - // on a closed channel - wg.Wait() - close(out) - }() + send := feed + var todobuffer []node.Node + var inProgress int - rateLimit := make(chan struct{}, FetchGraphConcurrency) + next := root + for { + select { + case send <- next: + inProgress++ + if len(todobuffer) > 0 { + next = todobuffer[0] + todobuffer = todobuffer[1:] + } else { + next = nil + send = nil + } + case <-done: + inProgress-- + if inProgress == 0 && next == nil { + return nil + } + case nc := <-out: + if nc.Err != nil { + return nc.Err + } - get := func(ks []*cid.Cid) { - defer wg.Done() - defer func() { - <-rateLimit - }() - nodes := ds.GetMany(ctx, ks) - for opt := range nodes { - select { - case out <- opt: - case <-ctx.Done(): - return + if next == nil { + next = nc.Node + send = feed + } else { + todobuffer = append(todobuffer, nc.Node) } - } - } - for ks := range in { - select { - case rateLimit <- struct{}{}: case <-ctx.Done(): - return + return ctx.Err() } - wg.Add(1) - go get(ks) } + } diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 4f793eae81e..c55a7b551f3 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -504,3 +504,46 @@ func TestCidRawDoesnNeedData(t *testing.T) { t.Fatal("raw node shouldn't have any links") } } + +func TestEnumerateAsyncFailsNotFound(t *testing.T) { + a := NodeWithData([]byte("foo1")) + b := NodeWithData([]byte("foo2")) + c := NodeWithData([]byte("foo3")) + d := NodeWithData([]byte("foo4")) + + ds := dstest.Mock() + for _, n := range []node.Node{a, b, c} { + _, err := ds.Add(n) + if err != nil { + t.Fatal(err) + } + } + + parent := new(ProtoNode) + if err := parent.AddNodeLinkClean("a", a); err != nil { + t.Fatal(err) + } + + if err := parent.AddNodeLinkClean("b", b); err != nil { + t.Fatal(err) + } + + if err := parent.AddNodeLinkClean("c", c); err != nil { + t.Fatal(err) + } + + if err := parent.AddNodeLinkClean("d", d); err != nil { + t.Fatal(err) + } + + pcid, err := ds.Add(parent) + if err != nil { + t.Fatal(err) + } + + cset := cid.NewSet() + err = EnumerateChildrenAsync(context.Background(), ds, pcid, cset.Visit) + if err == nil { + t.Fatal("this should have failed") + } +} From eac1fa2a3f319899f8e02b177f3acb74571634ee Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 11 Jan 2017 04:42:39 -0800 Subject: [PATCH 2/3] make pinning use serial graph enumeration License: MIT Signed-off-by: Jeromy --- merkledag/merkledag.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 5ab7daebe49..f508b950c36 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -140,7 +140,7 @@ func (n *dagService) Remove(nd node.Node) error { // FetchGraph fetches all nodes that are children of the given node func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error { - return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit) + return EnumerateChildren(ctx, serv, c, cid.NewSet().Visit, false) } // FindLinks searches this nodes links for the given key, From 397a35a33a0a4e0c9bfcca33892258fb718a0d81 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 12 Jan 2017 13:51:53 -0800 Subject: [PATCH 3/3] update test to print error from EnumerateChildren License: MIT Signed-off-by: Jeromy --- test/sharness/t0081-repo-pinning.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/sharness/t0081-repo-pinning.sh b/test/sharness/t0081-repo-pinning.sh index 319cc39fddf..17c98a4f1c9 100755 --- a/test/sharness/t0081-repo-pinning.sh +++ b/test/sharness/t0081-repo-pinning.sh @@ -240,7 +240,7 @@ test_expect_success "some are no longer there" ' test_expect_success "recursive pin fails without objects" ' ipfs pin rm -r=false "$HASH_DIR1" && test_must_fail ipfs pin add -r "$HASH_DIR1" 2>err_expected8 && - grep "pin: failed to fetch all nodes" err_expected8 || + grep "pin: merkledag: not found" err_expected8 || test_fsh cat err_expected8 '