Skip to content

Commit

Permalink
Merge pull request ipfs/go-fetcher#11 from ipfs/feat/cid-matching
Browse files Browse the repository at this point in the history
Add helpers for block visiting

This commit was moved from ipfs/go-fetcher@64b8f3e
  • Loading branch information
hannahhoward committed Mar 24, 2021
2 parents b52410a + 33c53ee commit 08eff5c
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 55 deletions.
26 changes: 18 additions & 8 deletions fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,8 @@ func (f *fetcherSession) BlockOfType(ctx context.Context, link ipld.Link, ptype
return f.linkSystem.Load(ipld.LinkContext{}, link, ptype)
}

func (f *fetcherSession) NodeMatching(ctx context.Context, node ipld.Node, match selector.Selector, cb FetchCallback) error {
return traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: f.linkSystem,
LinkTargetNodePrototypeChooser: f.protoChooser,
},
}.WalkMatching(node, match, func(prog traversal.Progress, n ipld.Node) error {
func (f *fetcherSession) nodeMatching(ctx context.Context, initialProgress traversal.Progress, node ipld.Node, match selector.Selector, cb FetchCallback) error {
return initialProgress.WalkMatching(node, match, func(prog traversal.Progress, n ipld.Node) error {
return cb(FetchResult{
Node: n,
Path: prog.Path,
Expand All @@ -94,6 +89,19 @@ func (f *fetcherSession) NodeMatching(ctx context.Context, node ipld.Node, match
})
}

func (f *fetcherSession) blankProgress(ctx context.Context) traversal.Progress {
return traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: f.linkSystem,
LinkTargetNodePrototypeChooser: f.protoChooser,
},
}
}

func (f *fetcherSession) NodeMatching(ctx context.Context, node ipld.Node, match selector.Selector, cb FetchCallback) error {
return f.nodeMatching(ctx, f.blankProgress(ctx), node, match, cb)
}

func (f *fetcherSession) BlockMatchingOfType(ctx context.Context, root ipld.Link, match selector.Selector,
ptype ipld.NodePrototype, cb FetchCallback) error {

Expand All @@ -103,7 +111,9 @@ func (f *fetcherSession) BlockMatchingOfType(ctx context.Context, root ipld.Link
return err
}

return f.NodeMatching(ctx, node, match, cb)
progress := f.blankProgress(ctx)
progress.LastBlock.Link = root
return f.nodeMatching(ctx, progress, node, match, cb)
}

func (f *fetcherSession) PrototypeFromLink(lnk ipld.Link) (ipld.NodePrototype, error) {
Expand Down
62 changes: 15 additions & 47 deletions fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package fetcher_test

import (
"bytes"
"context"
"fmt"
"io"
"strings"
"testing"
"time"
Expand All @@ -16,11 +13,10 @@ import (
tn "github.com/ipfs/go-bitswap/testnet"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-fetcher/testutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
"github.com/ipld/go-ipld-prime"
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/fluent"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
Expand All @@ -31,7 +27,7 @@ import (
)

func TestFetchIPLDPrimeNode(t *testing.T) {
block, node, _ := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) {
block, node, _ := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) {
na.AssembleEntry("foo").AssignBool(true)
na.AssembleEntry("bar").AssignBool(false)
na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) {
Expand Down Expand Up @@ -66,17 +62,17 @@ func TestFetchIPLDPrimeNode(t *testing.T) {
}

func TestFetchIPLDGraph(t *testing.T) {
block3, node3, link3 := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
block3, node3, link3 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
na.AssembleEntry("three").AssignBool(true)
}))
block4, node4, link4 := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
block4, node4, link4 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
na.AssembleEntry("four").AssignBool(true)
}))
block2, node2, link2 := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 2, func(na fluent.MapAssembler) {
block2, node2, link2 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 2, func(na fluent.MapAssembler) {
na.AssembleEntry("link3").AssignLink(link3)
na.AssembleEntry("link4").AssignLink(link4)
}))
block1, node1, _ := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) {
block1, node1, _ := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) {
na.AssembleEntry("foo").AssignBool(true)
na.AssembleEntry("bar").AssignBool(false)
na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) {
Expand Down Expand Up @@ -122,20 +118,20 @@ func TestFetchIPLDGraph(t *testing.T) {
}

func TestFetchIPLDPath(t *testing.T) {
block5, node5, link5 := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
block5, node5, link5 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
na.AssembleEntry("five").AssignBool(true)
}))
block3, _, link3 := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
block3, _, link3 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
na.AssembleEntry("three").AssignLink(link5)
}))
block4, _, link4 := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
block4, _, link4 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
na.AssembleEntry("four").AssignBool(true)
}))
block2, _, link2 := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 2, func(na fluent.MapAssembler) {
block2, _, link2 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 2, func(na fluent.MapAssembler) {
na.AssembleEntry("link3").AssignLink(link3)
na.AssembleEntry("link4").AssignLink(link4)
}))
block1, _, _ := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) {
block1, _, _ := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) {
na.AssembleEntry("foo").AssignBool(true)
na.AssembleEntry("bar").AssignBool(false)
na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) {
Expand Down Expand Up @@ -189,17 +185,17 @@ func TestFetchIPLDPath(t *testing.T) {
}

func TestHelpers(t *testing.T) {
block3, node3, link3 := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
block3, node3, link3 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
na.AssembleEntry("three").AssignBool(true)
}))
block4, node4, link4 := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
block4, node4, link4 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
na.AssembleEntry("four").AssignBool(true)
}))
block2, node2, link2 := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 2, func(na fluent.MapAssembler) {
block2, node2, link2 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 2, func(na fluent.MapAssembler) {
na.AssembleEntry("link3").AssignLink(link3)
na.AssembleEntry("link4").AssignLink(link4)
}))
block1, node1, _ := encodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) {
block1, node1, _ := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) {
na.AssembleEntry("foo").AssignBool(true)
na.AssembleEntry("bar").AssignBool(false)
na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) {
Expand Down Expand Up @@ -294,31 +290,3 @@ func assertNodesInOrder(t *testing.T, results []fetcher.FetchResult, nodeCount i

assert.Equal(t, nodeCount, len(results))
}

func encodeBlock(n ipld.Node) (blocks.Block, ipld.Node, ipld.Link) {
ls := cidlink.DefaultLinkSystem()
var b blocks.Block
lb := cidlink.LinkPrototype{cid.Prefix{
Version: 1,
Codec: 0x71,
MhType: 0x17,
MhLength: 20,
}}
ls.StorageWriteOpener = func(ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) {
buf := bytes.Buffer{}
return &buf, func(lnk ipld.Link) error {
clnk, ok := lnk.(cidlink.Link)
if !ok {
return fmt.Errorf("incorrect link type %v", lnk)
}
var err error
b, err = blocks.NewBlockWithCid(buf.Bytes(), clnk.Cid)
return err
}, nil
}
lnk, err := ls.Store(ipld.LinkContext{}, lb, n)
if err != nil {
panic(err)
}
return b, n, lnk
}
43 changes: 43 additions & 0 deletions fetcher/helpers/block_visitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package helpers

import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-fetcher"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
)

// BlockResult specifies a node at the top of a block boundary
type BlockResult struct {
Node ipld.Node
Link ipld.Link
}

// BlockCallback is a callback for visiting blocks
type BlockCallback func(BlockResult) error

// OnBlocks produces a fetch call back that only gets called when visiting blocks during a fetch
func OnBlocks(bv BlockCallback) fetcher.FetchCallback {
return func(fr fetcher.FetchResult) error {
if fr.LastBlockPath.String() == fr.Path.String() {
return bv(BlockResult{
Node: fr.Node,
Link: fr.LastBlockLink,
})
}
return nil
}
}

// OnUniqueBlocks is a callback that only gets called visiting each block once
func OnUniqueBlocks(bv BlockCallback) fetcher.FetchCallback {
set := cid.NewSet()
return OnBlocks(func(br BlockResult) error {
c := br.Link.(cidlink.Link).Cid
if set.Has(c) {
return nil
}
set.Add(c)
return bv(br)
})
}
140 changes: 140 additions & 0 deletions fetcher/helpers/block_visitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package helpers_test

import (
"context"
"testing"
"time"

testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-fetcher"
"github.com/ipfs/go-fetcher/helpers"
"github.com/ipfs/go-fetcher/testutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/fluent"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestFetchGraphToBlocks(t *testing.T) {
block3, node3, link3 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
na.AssembleEntry("three").AssignBool(true)
}))
block4, node4, link4 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
na.AssembleEntry("four").AssignBool(true)
}))
block2, node2, link2 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 2, func(na fluent.MapAssembler) {
na.AssembleEntry("link3").AssignLink(link3)
na.AssembleEntry("link4").AssignLink(link4)
}))
block1, node1, _ := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) {
na.AssembleEntry("foo").AssignBool(true)
na.AssembleEntry("bar").AssignBool(false)
na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) {
na.AssembleEntry("link2").AssignLink(link2)
na.AssembleEntry("nonlink").AssignString("zoo")
})
}))

net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()

peers := ig.Instances(2)
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

err := hasBlock.Exchange.HasBlock(block1)
require.NoError(t, err)
err = hasBlock.Exchange.HasBlock(block2)
require.NoError(t, err)
err = hasBlock.Exchange.HasBlock(block3)
require.NoError(t, err)
err = hasBlock.Exchange.HasBlock(block4)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
fetcherConfig := fetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

results := []helpers.BlockResult{}
err = fetcher.BlockAll(ctx, session, cidlink.Link{Cid: block1.Cid()}, helpers.OnBlocks(func(res helpers.BlockResult) error {
results = append(results, res)
return nil
}))
require.NoError(t, err)

assertBlocksInOrder(t, results, 4, map[int]ipld.Node{0: node1, 1: node2, 2: node3, 3: node4})
}

func TestFetchGraphToUniqueBlocks(t *testing.T) {
block3, node3, link3 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) {
na.AssembleEntry("three").AssignBool(true)
}))
block2, node2, link2 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 2, func(na fluent.MapAssembler) {
na.AssembleEntry("link3").AssignLink(link3)
}))
block1, node1, _ := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) {
na.AssembleEntry("foo").AssignBool(true)
na.AssembleEntry("bar").AssignBool(false)
na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) {
na.AssembleEntry("link3").AssignLink(link3)
na.AssembleEntry("link2").AssignLink(link2)
na.AssembleEntry("nonlink").AssignString("zoo")
})
}))

net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()

peers := ig.Instances(2)
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

err := hasBlock.Exchange.HasBlock(block1)
require.NoError(t, err)
err = hasBlock.Exchange.HasBlock(block2)
require.NoError(t, err)
err = hasBlock.Exchange.HasBlock(block3)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
fetcherConfig := fetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

results := []helpers.BlockResult{}
err = fetcher.BlockAll(ctx, session, cidlink.Link{Cid: block1.Cid()}, helpers.OnUniqueBlocks(func(res helpers.BlockResult) error {
results = append(results, res)
return nil
}))
require.NoError(t, err)

assertBlocksInOrder(t, results, 3, map[int]ipld.Node{0: node1, 1: node3, 2: node2})
}

func assertBlocksInOrder(t *testing.T, results []helpers.BlockResult, nodeCount int, nodes map[int]ipld.Node) {
for order, res := range results {
expectedNode, ok := nodes[order]
if ok {
assert.Equal(t, expectedNode, res.Node)
}
}

assert.Equal(t, nodeCount, len(results))
}
Loading

0 comments on commit 08eff5c

Please sign in to comment.