Skip to content

Commit

Permalink
Implement caching in GetLinks(), make EnumerateChildrenAsync use GetL…
Browse files Browse the repository at this point in the history
…inks

GetLinks() now is using link cache in datastore under /local/links and
will not necessarily fetch linked nodes. This also affects
EnumerateChildrenAsync and EnumerateChildren, and their previous
behaviour
can be reproduced by using FetchingVisitor from merkledag module.

License: MIT
Signed-off-by: Iaroslav Gridin <voker57@gmail.com>
  • Loading branch information
Voker57 committed Jun 28, 2017
1 parent 07162dd commit 2ce89db
Show file tree
Hide file tree
Showing 17 changed files with 249 additions and 92 deletions.
29 changes: 29 additions & 0 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type BlockService interface {
AddBlock(o blocks.Block) (*cid.Cid, error)
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
FetchBlock(ctx context.Context, c *cid.Cid) error
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
DeleteBlock(o blocks.Block) error
Close() error
Expand Down Expand Up @@ -168,6 +169,34 @@ func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
return nil, err
}

// FetchBlock ensures requested block is present in local storage
func (bs *blockService) FetchBlock(ctx context.Context, c *cid.Cid) error {
log.Debugf("BlockService FetchBlock: '%s'", c)

present, err := bs.blockstore.Has(c)
if err != nil {
return err
}
if present == true {
return nil
}

if bs.exchange != nil {
log.Debug("Blockservice: Searching bitswap")
_, err := bs.exchange.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return ErrNotFound
}
return err
}
return nil
}

log.Debug("Blockservice FetchBlock: Not found")
return ErrNotFound
}

// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
Expand Down
4 changes: 2 additions & 2 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}

n.Blocks = bserv.New(n.Blockstore, n.Exchange)
n.DAG = dag.NewDAGService(n.Blocks)
n.DAG = dag.NewDAGService(n.Blocks, n.Repo.Datastore())

internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore)))
internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore)), n.Repo.Datastore())
n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG, internalDag)
if err != nil {
// TODO: we should move towards only running 'NewPinner' explicity on
Expand Down
3 changes: 2 additions & 1 deletion core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,14 @@ You can now refer to the added file in a gateway, like so:
}

exch := n.Exchange

local, _, _ := req.Option("local").Bool()
if local {
exch = offline.Exchange(addblockstore)
}

bserv := blockservice.New(addblockstore, exch)
dserv := dag.NewDAGService(bserv)
dserv := dag.NewDAGService(bserv, n.Repo.Datastore())

fileAdder, err := coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, dserv)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,7 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGSer
provided := cid.NewSet()
for _, c := range cids {
kset := cid.NewSet()

err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, dag.FetchingVisitor(ctx, kset, dserv, cid.NewSet()))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ The JSON output contains type information.
if !resolve {
offlineexch := offline.Exchange(nd.Blockstore)
bserv := blockservice.New(nd.Blockstore, offlineexch)
dserv = merkledag.NewDAGService(bserv)
dserv = merkledag.NewDAGService(bserv, nd.Repo.Datastore())
}

paths := req.Arguments()
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func NewMemoryDagService() dag.DAGService {
// build mem-datastore for editor's intermediary nodes
bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
bsrv := bserv.New(bs, offline.Exchange(bs))
return dag.NewDAGService(bsrv)
return dag.NewDAGService(bsrv, syncds.MutexWrap(ds.NewMapDatastore()))
}

// from core/commands/object.go
Expand Down
9 changes: 6 additions & 3 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
pi "github.com/ipfs/go-ipfs/thirdparty/posinfo"
"github.com/ipfs/go-ipfs/thirdparty/testutil"

ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync"
cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
)

Expand Down Expand Up @@ -148,8 +150,9 @@ func TestAddGCLive(t *testing.T) {
defer cancel()

set := cid.NewSet()
err = dag.EnumerateChildren(ctx, node.DAG.GetLinks, last, set.Visit)
if err != nil {
missing := cid.NewSet()
err = dag.EnumerateChildren(ctx, node.DAG.GetLinks, last, dag.FetchingVisitor(ctx, set, node.DAG, missing))
if err != nil || missing.Len() > 0 {
t.Fatal(err)
}
}
Expand All @@ -170,7 +173,7 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) {

bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: "/tmp/foo.txt", t: t}
bserv := blockservice.New(bs, node.Exchange)
dserv := dag.NewDAGService(bserv)
dserv := dag.NewDAGService(bserv, syncds.MutexWrap(ds.NewMapDatastore()))
adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv)
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func getDagserv(t *testing.T) merkledag.DAGService {
db := dssync.MutexWrap(ds.NewMapDatastore())
bs := bstore.NewBlockstore(db)
blockserv := bserv.New(bs, offline.Exchange(bs))
return merkledag.NewDAGService(blockserv)
return merkledag.NewDAGService(blockserv, db)
}

func TestMetadata(t *testing.T) {
Expand Down
22 changes: 22 additions & 0 deletions merkledag/coding.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,28 @@ import (
// for now, we use a PBNode intermediate thing.
// because native go objects are nice.

func intermediateMarshal(l *node.Link) *pb.PBLink {
pbl := &pb.PBLink{}
pbl.Name = &l.Name
pbl.Tsize = &l.Size
if l.Cid != nil {
pbl.Hash = l.Cid.Bytes()
}
return pbl
}

func intermediateUnmarshal(pbl *pb.PBLink) (*node.Link, error) {
l := new(node.Link)
l.Name = pbl.GetName()
l.Size = pbl.GetTsize()
c, err := cid.Cast(pbl.GetHash())
if err != nil {
return nil, fmt.Errorf("Link hash is not valid multihash. %v", err)
}
l.Cid = c
return l, nil
}

// unmarshal decodes raw data into a *Node instance.
// The conversion uses an intermediate PBNode.
func (n *ProtoNode) unmarshal(encoded []byte) error {
Expand Down
Loading

0 comments on commit 2ce89db

Please sign in to comment.