forked from ipfs/boxo
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(helpers): add helpers for block visiting
This commit was moved from ipfs/go-fetcher@be81ab7
- Loading branch information
1 parent
b52410a
commit 33c53ee
Showing
5 changed files
with
258 additions
and
55 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package helpers | ||
|
||
import ( | ||
"github.com/ipfs/go-cid" | ||
"github.com/ipfs/go-fetcher" | ||
"github.com/ipld/go-ipld-prime" | ||
cidlink "github.com/ipld/go-ipld-prime/linking/cid" | ||
) | ||
|
||
// BlockResult specifies a node at the top of a block boundary | ||
type BlockResult struct { | ||
Node ipld.Node | ||
Link ipld.Link | ||
} | ||
|
||
// BlockCallback is a callback for visiting blocks | ||
type BlockCallback func(BlockResult) error | ||
|
||
// OnBlocks produces a fetch call back that only gets called when visiting blocks during a fetch | ||
func OnBlocks(bv BlockCallback) fetcher.FetchCallback { | ||
return func(fr fetcher.FetchResult) error { | ||
if fr.LastBlockPath.String() == fr.Path.String() { | ||
return bv(BlockResult{ | ||
Node: fr.Node, | ||
Link: fr.LastBlockLink, | ||
}) | ||
} | ||
return nil | ||
} | ||
} | ||
|
||
// OnUniqueBlocks is a callback that only gets called visiting each block once | ||
func OnUniqueBlocks(bv BlockCallback) fetcher.FetchCallback { | ||
set := cid.NewSet() | ||
return OnBlocks(func(br BlockResult) error { | ||
c := br.Link.(cidlink.Link).Cid | ||
if set.Has(c) { | ||
return nil | ||
} | ||
set.Add(c) | ||
return bv(br) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
package helpers_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
testinstance "github.com/ipfs/go-bitswap/testinstance" | ||
tn "github.com/ipfs/go-bitswap/testnet" | ||
"github.com/ipfs/go-blockservice" | ||
"github.com/ipfs/go-fetcher" | ||
"github.com/ipfs/go-fetcher/helpers" | ||
"github.com/ipfs/go-fetcher/testutil" | ||
delay "github.com/ipfs/go-ipfs-delay" | ||
mockrouting "github.com/ipfs/go-ipfs-routing/mock" | ||
"github.com/ipld/go-ipld-prime" | ||
"github.com/ipld/go-ipld-prime/fluent" | ||
cidlink "github.com/ipld/go-ipld-prime/linking/cid" | ||
basicnode "github.com/ipld/go-ipld-prime/node/basic" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestFetchGraphToBlocks(t *testing.T) { | ||
block3, node3, link3 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) { | ||
na.AssembleEntry("three").AssignBool(true) | ||
})) | ||
block4, node4, link4 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) { | ||
na.AssembleEntry("four").AssignBool(true) | ||
})) | ||
block2, node2, link2 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 2, func(na fluent.MapAssembler) { | ||
na.AssembleEntry("link3").AssignLink(link3) | ||
na.AssembleEntry("link4").AssignLink(link4) | ||
})) | ||
block1, node1, _ := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) { | ||
na.AssembleEntry("foo").AssignBool(true) | ||
na.AssembleEntry("bar").AssignBool(false) | ||
na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) { | ||
na.AssembleEntry("link2").AssignLink(link2) | ||
na.AssembleEntry("nonlink").AssignString("zoo") | ||
}) | ||
})) | ||
|
||
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) | ||
ig := testinstance.NewTestInstanceGenerator(net, nil, nil) | ||
defer ig.Close() | ||
|
||
peers := ig.Instances(2) | ||
hasBlock := peers[0] | ||
defer hasBlock.Exchange.Close() | ||
|
||
err := hasBlock.Exchange.HasBlock(block1) | ||
require.NoError(t, err) | ||
err = hasBlock.Exchange.HasBlock(block2) | ||
require.NoError(t, err) | ||
err = hasBlock.Exchange.HasBlock(block3) | ||
require.NoError(t, err) | ||
err = hasBlock.Exchange.HasBlock(block4) | ||
require.NoError(t, err) | ||
|
||
wantsBlock := peers[1] | ||
defer wantsBlock.Exchange.Close() | ||
|
||
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) | ||
fetcherConfig := fetcher.NewFetcherConfig(wantsGetter) | ||
session := fetcherConfig.NewSession(context.Background()) | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
defer cancel() | ||
|
||
results := []helpers.BlockResult{} | ||
err = fetcher.BlockAll(ctx, session, cidlink.Link{Cid: block1.Cid()}, helpers.OnBlocks(func(res helpers.BlockResult) error { | ||
results = append(results, res) | ||
return nil | ||
})) | ||
require.NoError(t, err) | ||
|
||
assertBlocksInOrder(t, results, 4, map[int]ipld.Node{0: node1, 1: node2, 2: node3, 3: node4}) | ||
} | ||
|
||
func TestFetchGraphToUniqueBlocks(t *testing.T) { | ||
block3, node3, link3 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 1, func(na fluent.MapAssembler) { | ||
na.AssembleEntry("three").AssignBool(true) | ||
})) | ||
block2, node2, link2 := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 2, func(na fluent.MapAssembler) { | ||
na.AssembleEntry("link3").AssignLink(link3) | ||
})) | ||
block1, node1, _ := testutil.EncodeBlock(fluent.MustBuildMap(basicnode.Prototype__Map{}, 3, func(na fluent.MapAssembler) { | ||
na.AssembleEntry("foo").AssignBool(true) | ||
na.AssembleEntry("bar").AssignBool(false) | ||
na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) { | ||
na.AssembleEntry("link3").AssignLink(link3) | ||
na.AssembleEntry("link2").AssignLink(link2) | ||
na.AssembleEntry("nonlink").AssignString("zoo") | ||
}) | ||
})) | ||
|
||
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) | ||
ig := testinstance.NewTestInstanceGenerator(net, nil, nil) | ||
defer ig.Close() | ||
|
||
peers := ig.Instances(2) | ||
hasBlock := peers[0] | ||
defer hasBlock.Exchange.Close() | ||
|
||
err := hasBlock.Exchange.HasBlock(block1) | ||
require.NoError(t, err) | ||
err = hasBlock.Exchange.HasBlock(block2) | ||
require.NoError(t, err) | ||
err = hasBlock.Exchange.HasBlock(block3) | ||
require.NoError(t, err) | ||
|
||
wantsBlock := peers[1] | ||
defer wantsBlock.Exchange.Close() | ||
|
||
wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) | ||
fetcherConfig := fetcher.NewFetcherConfig(wantsGetter) | ||
session := fetcherConfig.NewSession(context.Background()) | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
defer cancel() | ||
|
||
results := []helpers.BlockResult{} | ||
err = fetcher.BlockAll(ctx, session, cidlink.Link{Cid: block1.Cid()}, helpers.OnUniqueBlocks(func(res helpers.BlockResult) error { | ||
results = append(results, res) | ||
return nil | ||
})) | ||
require.NoError(t, err) | ||
|
||
assertBlocksInOrder(t, results, 3, map[int]ipld.Node{0: node1, 1: node3, 2: node2}) | ||
} | ||
|
||
func assertBlocksInOrder(t *testing.T, results []helpers.BlockResult, nodeCount int, nodes map[int]ipld.Node) { | ||
for order, res := range results { | ||
expectedNode, ok := nodes[order] | ||
if ok { | ||
assert.Equal(t, expectedNode, res.Node) | ||
} | ||
} | ||
|
||
assert.Equal(t, nodeCount, len(results)) | ||
} |
Oops, something went wrong.