Skip to content

Commit

Permalink
Implement caching in GetLinks(), make EnumerateChildrenAsync use GetL…
Browse files Browse the repository at this point in the history
…inks

GetLinks() now is using link cache in datastore under /local/links and
will not necessarily fetch linked nodes. This also affects
EnumerateChildrenAsync and EnumerateChildren, and their previous
behaviour
can be reproduced by using FetchingVisitor from merkledag module.

Add `ipfs repo flushlinkcache`

License: MIT
Signed-off-by: Iaroslav Gridin <voker57@gmail.com>
  • Loading branch information
Voker57 committed Sep 9, 2017
1 parent 71d72e2 commit 13b1a8f
Show file tree
Hide file tree
Showing 24 changed files with 359 additions and 78 deletions.
29 changes: 29 additions & 0 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type BlockService interface {
AddBlock(o blocks.Block) (*cid.Cid, error)
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
FetchBlock(ctx context.Context, c *cid.Cid) error
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
DeleteBlock(o blocks.Block) error
Close() error
Expand Down Expand Up @@ -195,6 +196,34 @@ func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f excha
return nil, err
}

// FetchBlock ensures requested block is present in local storage
func (bs *blockService) FetchBlock(ctx context.Context, c *cid.Cid) error {
log.Debugf("BlockService FetchBlock: '%s'", c)

present, err := bs.blockstore.Has(c)
if err != nil {
return err
}
if present {
return nil
}

if bs.exchange != nil {
log.Debug("Blockservice: Searching bitswap")
_, err := bs.exchange.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return ErrNotFound
}
return err
}
return nil
}

log.Debug("Blockservice FetchBlock: Not found")
return ErrNotFound
}

// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
Expand Down
4 changes: 2 additions & 2 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}

n.Blocks = bserv.New(n.Blockstore, n.Exchange)
n.DAG = dag.NewDAGService(n.Blocks)
n.DAG = dag.NewDAGService(n.Blocks, n.Repo.Datastore())

internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore)))
internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore)), n.Repo.Datastore())
n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG, internalDag)
if err != nil {
// TODO: we should move towards only running 'NewPinner' explicity on
Expand Down
3 changes: 2 additions & 1 deletion core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,14 @@ You can now refer to the added file in a gateway, like so:
}

exch := n.Exchange

local, _, _ := req.Option("local").Bool()
if local {
exch = offline.Exchange(addblockstore)
}

bserv := blockservice.New(addblockstore, exch)
dserv := dag.NewDAGService(bserv)
dserv := dag.NewDAGService(bserv, n.Repo.Datastore())

fileAdder, err := coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, dserv)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,9 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGSer
provided := cid.NewSet()
for _, c := range cids {
kset := cid.NewSet()
visitor := dag.FetchingVisitor(ctx, kset, dserv)

err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
err := dag.EnumerateChildrenAsync(ctx, dserv.GetLinks, c, visitor)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ The JSON output contains type information.
if !resolve {
offlineexch := offline.Exchange(nd.Blockstore)
bserv := blockservice.New(nd.Blockstore, offlineexch)
dserv = merkledag.NewDAGService(bserv)
dserv = merkledag.NewDAGService(bserv, nd.Repo.Datastore())
}

paths := req.Arguments()
Expand Down
3 changes: 2 additions & 1 deletion core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,9 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
visitor := dag.FetchingVisitor(ctx, set, n.DAG)
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(n.Context(), n.DAG.GetLinks, k, set.Visit)
err := dag.EnumerateChildren(n.Context(), n.DAG.GetLinks, k, visitor)
if err != nil {
return nil, err
}
Expand Down
38 changes: 33 additions & 5 deletions core/commands/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ var RepoCmd = &cmds.Command{
},

Subcommands: map[string]*cmds.Command{
"gc": repoGcCmd,
"stat": repoStatCmd,
"fsck": RepoFsckCmd,
"version": repoVersionCmd,
"verify": repoVerifyCmd,
"gc": repoGcCmd,
"stat": repoStatCmd,
"fsck": RepoFsckCmd,
"version": repoVersionCmd,
"verify": repoVerifyCmd,
"flushlinkcache": repoFlushLinkCacheCmd,
},
}

Expand Down Expand Up @@ -206,6 +207,33 @@ Version string The repo version.
},
}

var repoFlushLinkCacheCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Flush link cache.",
ShortDescription: `
'ipfs repo flushlinkcache' is a plumbing command that will flush link cache.
`,
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

err = corerepo.FlushLinkCache(req.Context(), n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
res.SetOutput(&MessageOutput{"Link cache flushed.\n"})
},
Type: MessageOutput{},
Marshalers: cmds.MarshalerMap{
cmds.Text: MessageTextMarshaler,
},
}

var RepoFsckCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Remove repo lockfiles.",
Expand Down
25 changes: 25 additions & 0 deletions core/corerepo/linkcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package corerepo

import (
context "context"
core "github.com/ipfs/go-ipfs/core"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
dsq "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/query"
)

// FlushLinkCache flushes link cache, deleting all keys in it
func FlushLinkCache(ctx context.Context, n *core.IpfsNode) error {
d := n.Repo.Datastore()
q := dsq.Query{KeysOnly: true, Prefix: "/local/links/"}
qr, err := d.Query(q)
if err != nil {
return err
}
for result := range qr.Next() {
if result.Error != nil {
return result.Error
}
d.Delete(ds.NewKey(result.Entry.Key))
}
return nil
}
2 changes: 1 addition & 1 deletion core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func NewMemoryDagService() dag.DAGService {
// build mem-datastore for editor's intermediary nodes
bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
bsrv := bserv.New(bs, offline.Exchange(bs))
return dag.NewDAGService(bsrv)
return dag.NewDAGService(bsrv, syncds.MutexWrap(ds.NewMapDatastore()))
}

// from core/commands/object.go
Expand Down
7 changes: 4 additions & 3 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
syncds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/sync"
)

func TestAddRecursive(t *testing.T) {
Expand Down Expand Up @@ -147,8 +149,7 @@ func TestAddGCLive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

set := cid.NewSet()
err = dag.EnumerateChildren(ctx, node.DAG.GetLinks, last, set.Visit)
err = dag.EnumerateChildren(ctx, node.DAG.GetLinks, last, dag.FetchingVisitor(ctx, cid.NewSet(), node.DAG))
if err != nil {
t.Fatal(err)
}
Expand All @@ -170,7 +171,7 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) {

bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: "/tmp/foo.txt", t: t}
bserv := blockservice.New(bs, node.Exchange)
dserv := dag.NewDAGService(bserv)
dserv := dag.NewDAGService(bserv, syncds.MutexWrap(ds.NewMapDatastore()))
adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv)
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func getDagserv(t *testing.T) merkledag.DAGService {
db := dssync.MutexWrap(ds.NewMapDatastore())
bs := bstore.NewBlockstore(db)
blockserv := bserv.New(bs, offline.Exchange(bs))
return merkledag.NewDAGService(blockserv)
return merkledag.NewDAGService(blockserv, db)
}

func TestMetadata(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion exchange/reprovide/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, o
for _, key := range pinning.RecursiveKeys() {
set.add(key)

visitor := func(cid *cid.Cid) (bool, error) {
return set.add(key), nil
}
if !onlyRoots {
err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.add)
err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, visitor)
if err != nil {
log.Errorf("reprovide indirect pins: %s", err)
return
Expand Down
22 changes: 22 additions & 0 deletions merkledag/coding.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,28 @@ import (
// for now, we use a PBNode intermediate thing.
// because native go objects are nice.

func intermediateMarshal(l *node.Link) *pb.PBLink {
pbl := &pb.PBLink{}
pbl.Name = &l.Name
pbl.Tsize = &l.Size
if l.Cid != nil {
pbl.Hash = l.Cid.Bytes()
}
return pbl
}

func intermediateUnmarshal(pbl *pb.PBLink) (*node.Link, error) {
l := new(node.Link)
l.Name = pbl.GetName()
l.Size = pbl.GetTsize()
c, err := cid.Cast(pbl.GetHash())
if err != nil {
return nil, fmt.Errorf("Link hash is not valid multihash. %v", err)
}
l.Cid = c
return l, nil
}

// unmarshal decodes raw data into a *Node instance.
// The conversion uses an intermediate PBNode.
func (n *ProtoNode) unmarshal(encoded []byte) error {
Expand Down
Loading

0 comments on commit 13b1a8f

Please sign in to comment.