Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add selector to BlockIO classes #178

Merged
merged 3 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions retrievalmarket/impl/blockio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ type BlockReader interface {
// allowing the next block to be read and then advancing no further
type SelectorBlockReader struct {
root ipld.Link
selector ipld.Node
loader ipld.Loader
traverser *Traverser
}

// NewSelectorBlockReader returns a new Block reader starting at the given
// root and using the given loader
func NewSelectorBlockReader(root ipld.Link, loader ipld.Loader) BlockReader {
return &SelectorBlockReader{root, loader, nil}
func NewSelectorBlockReader(root ipld.Link, sel ipld.Node, loader ipld.Loader) BlockReader {
return &SelectorBlockReader{root, sel, loader, nil}
}

// ReadBlock reads the next block in the IPLD traversal
func (sr *SelectorBlockReader) ReadBlock(ctx context.Context) (retrievalmarket.Block, bool, error) {

if sr.traverser == nil {
sr.traverser = NewTraverser(sr.root)
sr.traverser = NewTraverser(sr.root, sr.selector)
sr.traverser.Start(ctx)
}
lnk, lnkCtx := sr.traverser.CurrentRequest(ctx)
Expand Down
8 changes: 7 additions & 1 deletion retrievalmarket/impl/blockio/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockio"
Expand All @@ -16,8 +19,11 @@ func TestSelectorReader(t *testing.T) {
ctx := context.Background()
testdata := tut.NewTestIPLDTree()

ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

t.Run("reads correctly", func(t *testing.T) {
reader := blockio.NewSelectorBlockReader(testdata.RootNodeLnk, testdata.Loader)
reader := blockio.NewSelectorBlockReader(testdata.RootNodeLnk, sel, testdata.Loader)

checkReadSequence(ctx, t, reader, []blocks.Block{
testdata.RootBlock,
Expand Down
11 changes: 5 additions & 6 deletions retrievalmarket/impl/blockio/traverser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
free "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
)

type state struct {
Expand All @@ -28,6 +27,7 @@ type nextResponse struct {
// and waits for manual input (in the form of advance or error)
type Traverser struct {
root ipld.Link
selector ipld.Node
currentLink ipld.Link
currentContext ipld.LinkContext
isDone bool
Expand All @@ -51,9 +51,10 @@ func (t *Traverser) checkState(ctx context.Context) {
}

// NewTraverser creates a new traverser
func NewTraverser(root ipld.Link) *Traverser {
func NewTraverser(root ipld.Link, selector ipld.Node) *Traverser {
return &Traverser{
root: root,
selector: selector,
awaitRequest: make(chan struct{}, 1),
stateChan: make(chan state, 1),
responses: make(chan nextResponse),
Expand Down Expand Up @@ -97,10 +98,8 @@ func (t *Traverser) Start(ctx context.Context) {
t.writeDone(ctx)
return
}
ssb := builder.NewSelectorSpecBuilder(free.NodeBuilder())

allSelector, err := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Selector()
sel, err := selector.ParseSelector(t.selector)
if err != nil {
t.writeDone(ctx)
return
Expand All @@ -111,7 +110,7 @@ func (t *Traverser) Start(ctx context.Context) {
LinkLoader: loader,
LinkNodeBuilderChooser: chooser,
},
}.WalkAdv(nd, allSelector, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil })
}.WalkAdv(nd, sel, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil })
t.writeDone(ctx)
}()

Expand Down
8 changes: 7 additions & 1 deletion retrievalmarket/impl/blockio/traverser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"testing"

blocks "github.com/ipfs/go-block-format"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockio"
Expand All @@ -17,8 +20,11 @@ func TestTraverser(t *testing.T) {
ctx := context.Background()
testdata := tut.NewTestIPLDTree()

ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

t.Run("traverses correctly", func(t *testing.T) {
traverser := blockio.NewTraverser(testdata.RootNodeLnk)
traverser := blockio.NewTraverser(testdata.RootNodeLnk, sel)
traverser.Start(ctx)
checkTraverseSequence(ctx, t, traverser, []blocks.Block{
testdata.RootBlock,
Expand Down
7 changes: 4 additions & 3 deletions retrievalmarket/impl/blockio/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ type BlockVerifier interface {
// in the order they are traversed in a dag walk
type SelectorVerifier struct {
root ipld.Link
selector ipld.Node
traverser *Traverser
}

// NewSelectorVerifier returns a new selector based block verifier
func NewSelectorVerifier(root ipld.Link) BlockVerifier {
return &SelectorVerifier{root, nil}
func NewSelectorVerifier(root ipld.Link, selector ipld.Node) BlockVerifier {
return &SelectorVerifier{root, selector, nil}
}

// Verify verifies that the given block is the next one needed for the current traversal
// and returns true if the traversal is done
func (sv *SelectorVerifier) Verify(ctx context.Context, blk blocks.Block) (done bool, err error) {
if sv.traverser == nil {
sv.traverser = NewTraverser(sv.root)
sv.traverser = NewTraverser(sv.root, sv.selector)
sv.traverser.Start(ctx)
}
if sv.traverser.IsComplete(ctx) {
Expand Down
15 changes: 11 additions & 4 deletions retrievalmarket/impl/blockio/verify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"testing"

blocks "github.com/ipfs/go-block-format"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockio"
Expand All @@ -15,8 +18,11 @@ func TestSelectorVerifier(t *testing.T) {
ctx := context.Background()
testdata := tut.NewTestIPLDTree()

ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

t.Run("verifies correctly", func(t *testing.T) {
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk)
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel)
checkVerifySequence(ctx, t, verifier, false, []blocks.Block{
testdata.RootBlock,
testdata.LeafAlphaBlock,
Expand All @@ -29,15 +35,16 @@ func TestSelectorVerifier(t *testing.T) {
testdata.LeafAlphaBlock,
})
})

t.Run("fed incorrect block", func(t *testing.T) {
t.Run("right away", func(t *testing.T) {
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk)
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel)
checkVerifySequence(ctx, t, verifier, true, []blocks.Block{
testdata.LeafAlphaBlock,
})
})
t.Run("in middle", func(t *testing.T) {
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk)
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel)
checkVerifySequence(ctx, t, verifier, true, []blocks.Block{
testdata.RootBlock,
testdata.LeafAlphaBlock,
Expand All @@ -46,7 +53,7 @@ func TestSelectorVerifier(t *testing.T) {
})
})
t.Run("at end", func(t *testing.T) {
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk)
verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel)
checkVerifySequence(ctx, t, verifier, true, []blocks.Block{
testdata.RootBlock,
testdata.LeafAlphaBlock,
Expand Down
3 changes: 2 additions & 1 deletion retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ func (c *client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie
}

c.dealStreams[dealID] = s
c.blockVerifiers[dealID] = blockio.NewSelectorVerifier(cidlink.Link{Cid: dealState.DealProposal.PayloadCID})

c.blockVerifiers[dealID] = blockio.NewSelectorVerifier(cidlink.Link{Cid: dealState.DealProposal.PayloadCID}, allSelector())

err = c.stateMachines.Send(dealState.ID, retrievalmarket.ClientEventOpen)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions retrievalmarket/impl/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package retrievalimpl

import (
"github.com/ipld/go-ipld-prime"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
)

func allSelector() ipld.Node {
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
return ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).
Node()
}
3 changes: 2 additions & 1 deletion retrievalmarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ func (p *provider) newProviderDeal(stream rmnet.RetrievalDealStream) error {
p.dealStreams[pds.Identifier()] = stream

loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(context.TODO(), p.bs, p.pieceStore, cario.NewCarIO(), p.node.UnsealSector)
br := blockio.NewSelectorBlockReader(cidlink.Link{Cid: dealProposal.PayloadCID}, loaderWithUnsealing.Load)

br := blockio.NewSelectorBlockReader(cidlink.Link{Cid: dealProposal.PayloadCID}, allSelector(), loaderWithUnsealing.Load)
p.blockReaders[pds.Identifier()] = br

// start the deal processing, synchronously so we can log the error and close the stream if it doesn't start
Expand Down