From 49ac7c3aec6b138956ab5db061b9463a39b50834 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 18 Feb 2021 12:15:06 -0800 Subject: [PATCH] optimize CheckIfPinned 1. Parallelize fetching from disk. 2. Avoid re-visiting blocks we've already checked. Adding the same data over and over with small changes is pretty common. --- dspinner/pin.go | 48 +++++++++++++++-------------------------------- ipldpinner/pin.go | 34 ++++++++++----------------------- 2 files changed, 25 insertions(+), 57 deletions(-) diff --git a/dspinner/pin.go b/dspinner/pin.go index 5fd65e7..f9f5ff9 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -17,6 +17,7 @@ import ( "github.com/ipfs/go-ipfs-pinner/dsindex" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" + "github.com/ipfs/go-merkledag" mdag "github.com/ipfs/go-merkledag" "github.com/ipfs/go-merkledag/dagutils" "github.com/polydawn/refmt/cbor" @@ -489,49 +490,30 @@ func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]ipfspinn } } - // Now walk all recursive pins to check for indirect pins - var checkChildren func(cid.Cid, cid.Cid) error - checkChildren = func(rk, parentKey cid.Cid) error { - links, err := ipld.GetLinks(ctx, p.dserv, parentKey) - if err != nil { - return err - } - for _, lnk := range links { - c := lnk.Cid - - if toCheck.Has(c) { - pinned = append(pinned, - ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk}) - toCheck.Remove(c) - } - - err = checkChildren(rk, c) - if err != nil { - return err - } - - if toCheck.Len() == 0 { - return nil - } - } - return nil - } - var e error + visited := cid.NewSet() err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool { var rk cid.Cid rk, e = cid.Cast([]byte(key)) if e != nil { return false } - e = checkChildren(rk, rk) + e = merkledag.Walk(ctx, merkledag.GetLinksWithDAG(p.dserv), rk, func(c cid.Cid) bool { + if toCheck.Len() == 0 || !visited.Visit(c) { + return false + } + + if toCheck.Has(c) { + pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk}) + toCheck.Remove(c) + } + + return true + }, merkledag.Concurrent()) if e != nil { return false } - if toCheck.Len() == 0 { - return false - } - return true + return toCheck.Len() > 0 }) if err != nil { return nil, err diff --git a/ipldpinner/pin.go b/ipldpinner/pin.go index dc90dd4..5620836 100644 --- a/ipldpinner/pin.go +++ b/ipldpinner/pin.go @@ -14,6 +14,7 @@ import ( ds "github.com/ipfs/go-datastore" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" + "github.com/ipfs/go-merkledag" mdag "github.com/ipfs/go-merkledag" "github.com/ipfs/go-merkledag/dagutils" @@ -328,35 +329,20 @@ func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]ipfspinn } // Now walk all recursive pins to check for indirect pins - var checkChildren func(cid.Cid, cid.Cid) error - checkChildren = func(rk, parentKey cid.Cid) error { - links, err := ipld.GetLinks(ctx, p.dserv, parentKey) - if err != nil { - return err - } - for _, lnk := range links { - c := lnk.Cid + visited := cid.NewSet() + for _, rk := range p.recursePin.Keys() { + err := merkledag.Walk(ctx, merkledag.GetLinksWithDAG(p.dserv), rk, func(c cid.Cid) bool { + if toCheck.Len() == 0 || !visited.Visit(c) { + return false + } if toCheck.Has(c) { - pinned = append(pinned, - ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk}) + pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk}) toCheck.Remove(c) } - err := checkChildren(rk, c) - if err != nil { - return err - } - - if toCheck.Len() == 0 { - return nil - } - } - return nil - } - - for _, rk := range p.recursePin.Keys() { - err := checkChildren(rk, rk) + return true + }, merkledag.Concurrent()) if err != nil { return nil, err }