From 987c59109eede45bf8f044b48b27723acf48e832 Mon Sep 17 00:00:00 2001 From: shannonwells Date: Wed, 1 Apr 2020 15:15:02 -0700 Subject: [PATCH 1/3] add selector to BlockIO classes --- retrievalmarket/impl/blockio/reader.go | 8 +++++--- retrievalmarket/impl/blockio/reader_test.go | 10 +++++++++- retrievalmarket/impl/blockio/traverser.go | 14 ++++---------- retrievalmarket/impl/blockio/traverser_test.go | 10 +++++++++- retrievalmarket/impl/blockio/verify.go | 8 +++++--- retrievalmarket/impl/blockio/verify_test.go | 17 +++++++++++++---- retrievalmarket/impl/client.go | 7 ++++++- retrievalmarket/impl/common.go | 13 +++++++++++++ retrievalmarket/impl/provider.go | 7 ++++++- 9 files changed, 70 insertions(+), 24 deletions(-) create mode 100644 retrievalmarket/impl/common.go diff --git a/retrievalmarket/impl/blockio/reader.go b/retrievalmarket/impl/blockio/reader.go index 065959fb..4d0c72f6 100644 --- a/retrievalmarket/impl/blockio/reader.go +++ b/retrievalmarket/impl/blockio/reader.go @@ -7,6 +7,7 @@ import ( "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/filecoin-project/go-fil-markets/retrievalmarket" ) @@ -22,20 +23,21 @@ type BlockReader interface { // allowing the next block to be read and then advancing no further type SelectorBlockReader struct { root ipld.Link + sel selector.Selector loader ipld.Loader traverser *Traverser } // NewSelectorBlockReader returns a new Block reader starting at the given // root and using the given loader -func NewSelectorBlockReader(root ipld.Link, loader ipld.Loader) BlockReader { - return &SelectorBlockReader{root, loader, nil} +func NewSelectorBlockReader(root ipld.Link, sel selector.Selector, loader ipld.Loader) BlockReader { + return &SelectorBlockReader{root, sel, loader, nil} } // ReadBlock reads the next block in the IPLD traversal func (sr *SelectorBlockReader) ReadBlock(ctx context.Context) (retrievalmarket.Block, bool, error) { if sr.traverser == nil { - sr.traverser = NewTraverser(sr.root) + sr.traverser = NewTraverser(sr.root, sr.sel) sr.traverser.Start(ctx) } lnk, lnkCtx := sr.traverser.CurrentRequest(ctx) diff --git a/retrievalmarket/impl/blockio/reader_test.go b/retrievalmarket/impl/blockio/reader_test.go index e0c29777..25d878d9 100644 --- a/retrievalmarket/impl/blockio/reader_test.go +++ b/retrievalmarket/impl/blockio/reader_test.go @@ -6,6 +6,9 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + ipldfree "github.com/ipld/go-ipld-prime/impl/free" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockio" @@ -16,8 +19,13 @@ func TestSelectorReader(t *testing.T) { ctx := context.Background() testdata := tut.NewTestIPLDTree() + ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) + allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())) + sel, err := allSelector.Selector() + require.NoError(t, err) + t.Run("reads correctly", func(t *testing.T) { - reader := blockio.NewSelectorBlockReader(testdata.RootNodeLnk, testdata.Loader) + reader := blockio.NewSelectorBlockReader(testdata.RootNodeLnk, sel, testdata.Loader) checkReadSequence(ctx, t, reader, []blocks.Block{ testdata.RootBlock, diff --git a/retrievalmarket/impl/blockio/traverser.go b/retrievalmarket/impl/blockio/traverser.go index 3fd28bc1..b90e1587 100644 --- a/retrievalmarket/impl/blockio/traverser.go +++ b/retrievalmarket/impl/blockio/traverser.go @@ -10,7 +10,6 @@ import ( free "github.com/ipld/go-ipld-prime/impl/free" "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" - "github.com/ipld/go-ipld-prime/traversal/selector/builder" ) type state struct { @@ -28,6 +27,7 @@ type nextResponse struct { // and waits for manual input (in the form of advance or error) type Traverser struct { root ipld.Link + selector selector.Selector currentLink ipld.Link currentContext ipld.LinkContext isDone bool @@ -51,9 +51,10 @@ func (t *Traverser) checkState(ctx context.Context) { } // NewTraverser creates a new traverser -func NewTraverser(root ipld.Link) *Traverser { +func NewTraverser(root ipld.Link, selector selector.Selector) *Traverser { return &Traverser{ root: root, + selector: selector, awaitRequest: make(chan struct{}, 1), stateChan: make(chan state, 1), responses: make(chan nextResponse), @@ -97,21 +98,14 @@ func (t *Traverser) Start(ctx context.Context) { t.writeDone(ctx) return } - ssb := builder.NewSelectorSpecBuilder(free.NodeBuilder()) - allSelector, err := ssb.ExploreRecursive(selector.RecursionLimitNone(), - ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Selector() - if err != nil { - t.writeDone(ctx) - return - } _ = traversal.Progress{ Cfg: &traversal.Config{ Ctx: ctx, LinkLoader: loader, LinkNodeBuilderChooser: chooser, }, - }.WalkAdv(nd, allSelector, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil }) + }.WalkAdv(nd, t.selector, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil }) t.writeDone(ctx) }() diff --git a/retrievalmarket/impl/blockio/traverser_test.go b/retrievalmarket/impl/blockio/traverser_test.go index e4d781a0..1c999e6f 100644 --- a/retrievalmarket/impl/blockio/traverser_test.go +++ b/retrievalmarket/impl/blockio/traverser_test.go @@ -6,7 +6,10 @@ import ( "testing" blocks "github.com/ipfs/go-block-format" + ipldfree "github.com/ipld/go-ipld-prime/impl/free" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockio" @@ -17,8 +20,13 @@ func TestTraverser(t *testing.T) { ctx := context.Background() testdata := tut.NewTestIPLDTree() + ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) + allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())) + sel, err := allSelector.Selector() + require.NoError(t, err) + t.Run("traverses correctly", func(t *testing.T) { - traverser := blockio.NewTraverser(testdata.RootNodeLnk) + traverser := blockio.NewTraverser(testdata.RootNodeLnk, sel) traverser.Start(ctx) checkTraverseSequence(ctx, t, traverser, []blocks.Block{ testdata.RootBlock, diff --git a/retrievalmarket/impl/blockio/verify.go b/retrievalmarket/impl/blockio/verify.go index f5ce6980..fc4e3f88 100644 --- a/retrievalmarket/impl/blockio/verify.go +++ b/retrievalmarket/impl/blockio/verify.go @@ -7,6 +7,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/filecoin-project/go-fil-markets/retrievalmarket" ) @@ -21,19 +22,20 @@ type BlockVerifier interface { // in the order they are traversed in a dag walk type SelectorVerifier struct { root ipld.Link + sel selector.Selector traverser *Traverser } // NewSelectorVerifier returns a new selector based block verifier -func NewSelectorVerifier(root ipld.Link) BlockVerifier { - return &SelectorVerifier{root, nil} +func NewSelectorVerifier(root ipld.Link, sel selector.Selector) BlockVerifier { + return &SelectorVerifier{root, sel, nil} } // Verify verifies that the given block is the next one needed for the current traversal // and returns true if the traversal is done func (sv *SelectorVerifier) Verify(ctx context.Context, blk blocks.Block) (done bool, err error) { if sv.traverser == nil { - sv.traverser = NewTraverser(sv.root) + sv.traverser = NewTraverser(sv.root, sv.sel) sv.traverser.Start(ctx) } if sv.traverser.IsComplete(ctx) { diff --git a/retrievalmarket/impl/blockio/verify_test.go b/retrievalmarket/impl/blockio/verify_test.go index 904b4bea..ef869f10 100644 --- a/retrievalmarket/impl/blockio/verify_test.go +++ b/retrievalmarket/impl/blockio/verify_test.go @@ -5,6 +5,9 @@ import ( "testing" blocks "github.com/ipfs/go-block-format" + ipldfree "github.com/ipld/go-ipld-prime/impl/free" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockio" @@ -15,8 +18,13 @@ func TestSelectorVerifier(t *testing.T) { ctx := context.Background() testdata := tut.NewTestIPLDTree() + ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) + allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())) + sel, err := allSelector.Selector() + require.NoError(t, err) + t.Run("verifies correctly", func(t *testing.T) { - verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk) + verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel) checkVerifySequence(ctx, t, verifier, false, []blocks.Block{ testdata.RootBlock, testdata.LeafAlphaBlock, @@ -29,15 +37,16 @@ func TestSelectorVerifier(t *testing.T) { testdata.LeafAlphaBlock, }) }) + t.Run("fed incorrect block", func(t *testing.T) { t.Run("right away", func(t *testing.T) { - verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk) + verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel) checkVerifySequence(ctx, t, verifier, true, []blocks.Block{ testdata.LeafAlphaBlock, }) }) t.Run("in middle", func(t *testing.T) { - verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk) + verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel) checkVerifySequence(ctx, t, verifier, true, []blocks.Block{ testdata.RootBlock, testdata.LeafAlphaBlock, @@ -46,7 +55,7 @@ func TestSelectorVerifier(t *testing.T) { }) }) t.Run("at end", func(t *testing.T) { - verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk) + verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel) checkVerifySequence(ctx, t, verifier, true, []blocks.Block{ testdata.RootBlock, testdata.LeafAlphaBlock, diff --git a/retrievalmarket/impl/client.go b/retrievalmarket/impl/client.go index 39e0b2a5..1a8bd6c6 100644 --- a/retrievalmarket/impl/client.go +++ b/retrievalmarket/impl/client.go @@ -148,7 +148,12 @@ func (c *client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie } c.dealStreams[dealID] = s - c.blockVerifiers[dealID] = blockio.NewSelectorVerifier(cidlink.Link{Cid: dealState.DealProposal.PayloadCID}) + + sel, err := allSelector() + if err != nil { + return 0, err + } + c.blockVerifiers[dealID] = blockio.NewSelectorVerifier(cidlink.Link{Cid: dealState.DealProposal.PayloadCID}, sel) err = c.stateMachines.Send(dealState.ID, retrievalmarket.ClientEventOpen) if err != nil { diff --git a/retrievalmarket/impl/common.go b/retrievalmarket/impl/common.go new file mode 100644 index 00000000..4540f2a7 --- /dev/null +++ b/retrievalmarket/impl/common.go @@ -0,0 +1,13 @@ +package retrievalimpl + +import ( + ipldfree "github.com/ipld/go-ipld-prime/impl/free" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" +) + +func allSelector() (selector.Selector, error) { + ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) + allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())) + return allSelector.Selector() +} \ No newline at end of file diff --git a/retrievalmarket/impl/provider.go b/retrievalmarket/impl/provider.go index 03678a2d..90314019 100644 --- a/retrievalmarket/impl/provider.go +++ b/retrievalmarket/impl/provider.go @@ -229,7 +229,12 @@ func (p *provider) newProviderDeal(stream rmnet.RetrievalDealStream) error { p.dealStreams[pds.Identifier()] = stream loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(context.TODO(), p.bs, p.pieceStore, cario.NewCarIO(), p.node.UnsealSector) - br := blockio.NewSelectorBlockReader(cidlink.Link{Cid: dealProposal.PayloadCID}, loaderWithUnsealing.Load) + + sel, err := allSelector() + if err != nil { + return err + } + br := blockio.NewSelectorBlockReader(cidlink.Link{Cid: dealProposal.PayloadCID}, sel, loaderWithUnsealing.Load) p.blockReaders[pds.Identifier()] = br // start the deal processing, synchronously so we can log the error and close the stream if it doesn't start From 4a6a571b627e2dc1b5493e65f06a810fa3231bd1 Mon Sep 17 00:00:00 2001 From: shannonwells Date: Wed, 1 Apr 2020 15:32:37 -0700 Subject: [PATCH 2/3] use ipld.Node and not selector.Selector as param --- retrievalmarket/impl/blockio/reader.go | 6 +++--- retrievalmarket/impl/blockio/reader_test.go | 4 +--- retrievalmarket/impl/blockio/traverser.go | 11 ++++++++--- retrievalmarket/impl/blockio/traverser_test.go | 4 +--- retrievalmarket/impl/blockio/verify.go | 9 ++++----- retrievalmarket/impl/blockio/verify_test.go | 4 +--- retrievalmarket/impl/client.go | 6 +----- retrievalmarket/impl/common.go | 8 +++++--- retrievalmarket/impl/provider.go | 6 +----- 9 files changed, 25 insertions(+), 33 deletions(-) diff --git a/retrievalmarket/impl/blockio/reader.go b/retrievalmarket/impl/blockio/reader.go index 4d0c72f6..40d179c6 100644 --- a/retrievalmarket/impl/blockio/reader.go +++ b/retrievalmarket/impl/blockio/reader.go @@ -7,7 +7,6 @@ import ( "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/filecoin-project/go-fil-markets/retrievalmarket" ) @@ -23,19 +22,20 @@ type BlockReader interface { // allowing the next block to be read and then advancing no further type SelectorBlockReader struct { root ipld.Link - sel selector.Selector + sel ipld.Node loader ipld.Loader traverser *Traverser } // NewSelectorBlockReader returns a new Block reader starting at the given // root and using the given loader -func NewSelectorBlockReader(root ipld.Link, sel selector.Selector, loader ipld.Loader) BlockReader { +func NewSelectorBlockReader(root ipld.Link, sel ipld.Node, loader ipld.Loader) BlockReader { return &SelectorBlockReader{root, sel, loader, nil} } // ReadBlock reads the next block in the IPLD traversal func (sr *SelectorBlockReader) ReadBlock(ctx context.Context) (retrievalmarket.Block, bool, error) { + if sr.traverser == nil { sr.traverser = NewTraverser(sr.root, sr.sel) sr.traverser.Start(ctx) diff --git a/retrievalmarket/impl/blockio/reader_test.go b/retrievalmarket/impl/blockio/reader_test.go index 25d878d9..793683d4 100644 --- a/retrievalmarket/impl/blockio/reader_test.go +++ b/retrievalmarket/impl/blockio/reader_test.go @@ -20,9 +20,7 @@ func TestSelectorReader(t *testing.T) { testdata := tut.NewTestIPLDTree() ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) - allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())) - sel, err := allSelector.Selector() - require.NoError(t, err) + sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() t.Run("reads correctly", func(t *testing.T) { reader := blockio.NewSelectorBlockReader(testdata.RootNodeLnk, sel, testdata.Loader) diff --git a/retrievalmarket/impl/blockio/traverser.go b/retrievalmarket/impl/blockio/traverser.go index b90e1587..38288c0c 100644 --- a/retrievalmarket/impl/blockio/traverser.go +++ b/retrievalmarket/impl/blockio/traverser.go @@ -27,7 +27,7 @@ type nextResponse struct { // and waits for manual input (in the form of advance or error) type Traverser struct { root ipld.Link - selector selector.Selector + selector ipld.Node currentLink ipld.Link currentContext ipld.LinkContext isDone bool @@ -51,7 +51,7 @@ func (t *Traverser) checkState(ctx context.Context) { } // NewTraverser creates a new traverser -func NewTraverser(root ipld.Link, selector selector.Selector) *Traverser { +func NewTraverser(root ipld.Link, selector ipld.Node) *Traverser { return &Traverser{ root: root, selector: selector, @@ -99,13 +99,18 @@ func (t *Traverser) Start(ctx context.Context) { return } + sel, err := selector.ParseSelector(t.selector) + if err != nil { + t.Error(ctx, err) + return + } _ = traversal.Progress{ Cfg: &traversal.Config{ Ctx: ctx, LinkLoader: loader, LinkNodeBuilderChooser: chooser, }, - }.WalkAdv(nd, t.selector, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil }) + }.WalkAdv(nd, sel, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil }) t.writeDone(ctx) }() diff --git a/retrievalmarket/impl/blockio/traverser_test.go b/retrievalmarket/impl/blockio/traverser_test.go index 1c999e6f..b6a55573 100644 --- a/retrievalmarket/impl/blockio/traverser_test.go +++ b/retrievalmarket/impl/blockio/traverser_test.go @@ -21,9 +21,7 @@ func TestTraverser(t *testing.T) { testdata := tut.NewTestIPLDTree() ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) - allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())) - sel, err := allSelector.Selector() - require.NoError(t, err) + sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() t.Run("traverses correctly", func(t *testing.T) { traverser := blockio.NewTraverser(testdata.RootNodeLnk, sel) diff --git a/retrievalmarket/impl/blockio/verify.go b/retrievalmarket/impl/blockio/verify.go index fc4e3f88..a72ef691 100644 --- a/retrievalmarket/impl/blockio/verify.go +++ b/retrievalmarket/impl/blockio/verify.go @@ -7,7 +7,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/filecoin-project/go-fil-markets/retrievalmarket" ) @@ -22,20 +21,20 @@ type BlockVerifier interface { // in the order they are traversed in a dag walk type SelectorVerifier struct { root ipld.Link - sel selector.Selector + selector ipld.Node traverser *Traverser } // NewSelectorVerifier returns a new selector based block verifier -func NewSelectorVerifier(root ipld.Link, sel selector.Selector) BlockVerifier { - return &SelectorVerifier{root, sel, nil} +func NewSelectorVerifier(root ipld.Link, selector ipld.Node) BlockVerifier { + return &SelectorVerifier{root, selector, nil} } // Verify verifies that the given block is the next one needed for the current traversal // and returns true if the traversal is done func (sv *SelectorVerifier) Verify(ctx context.Context, blk blocks.Block) (done bool, err error) { if sv.traverser == nil { - sv.traverser = NewTraverser(sv.root, sv.sel) + sv.traverser = NewTraverser(sv.root, sv.selector) sv.traverser.Start(ctx) } if sv.traverser.IsComplete(ctx) { diff --git a/retrievalmarket/impl/blockio/verify_test.go b/retrievalmarket/impl/blockio/verify_test.go index ef869f10..622833e9 100644 --- a/retrievalmarket/impl/blockio/verify_test.go +++ b/retrievalmarket/impl/blockio/verify_test.go @@ -19,9 +19,7 @@ func TestSelectorVerifier(t *testing.T) { testdata := tut.NewTestIPLDTree() ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) - allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())) - sel, err := allSelector.Selector() - require.NoError(t, err) + sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() t.Run("verifies correctly", func(t *testing.T) { verifier := blockio.NewSelectorVerifier(testdata.RootNodeLnk, sel) diff --git a/retrievalmarket/impl/client.go b/retrievalmarket/impl/client.go index 1a8bd6c6..b42eb659 100644 --- a/retrievalmarket/impl/client.go +++ b/retrievalmarket/impl/client.go @@ -149,11 +149,7 @@ func (c *client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie c.dealStreams[dealID] = s - sel, err := allSelector() - if err != nil { - return 0, err - } - c.blockVerifiers[dealID] = blockio.NewSelectorVerifier(cidlink.Link{Cid: dealState.DealProposal.PayloadCID}, sel) + c.blockVerifiers[dealID] = blockio.NewSelectorVerifier(cidlink.Link{Cid: dealState.DealProposal.PayloadCID}, allSelector()) err = c.stateMachines.Send(dealState.ID, retrievalmarket.ClientEventOpen) if err != nil { diff --git a/retrievalmarket/impl/common.go b/retrievalmarket/impl/common.go index 4540f2a7..0048663a 100644 --- a/retrievalmarket/impl/common.go +++ b/retrievalmarket/impl/common.go @@ -1,13 +1,15 @@ package retrievalimpl import ( + "github.com/ipld/go-ipld-prime" ipldfree "github.com/ipld/go-ipld-prime/impl/free" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" ) -func allSelector() (selector.Selector, error) { +func allSelector() ipld.Node { ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) - allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())) - return allSelector.Selector() + return ssb.ExploreRecursive(selector.RecursionLimitNone(), + ssb.ExploreAll(ssb.ExploreRecursiveEdge())). + Node() } \ No newline at end of file diff --git a/retrievalmarket/impl/provider.go b/retrievalmarket/impl/provider.go index 90314019..fe9487fb 100644 --- a/retrievalmarket/impl/provider.go +++ b/retrievalmarket/impl/provider.go @@ -230,11 +230,7 @@ func (p *provider) newProviderDeal(stream rmnet.RetrievalDealStream) error { loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(context.TODO(), p.bs, p.pieceStore, cario.NewCarIO(), p.node.UnsealSector) - sel, err := allSelector() - if err != nil { - return err - } - br := blockio.NewSelectorBlockReader(cidlink.Link{Cid: dealProposal.PayloadCID}, sel, loaderWithUnsealing.Load) + br := blockio.NewSelectorBlockReader(cidlink.Link{Cid: dealProposal.PayloadCID}, allSelector(), loaderWithUnsealing.Load) p.blockReaders[pds.Identifier()] = br // start the deal processing, synchronously so we can log the error and close the stream if it doesn't start From 3be764f0e4991499e62c0a56a518f9e03e951e60 Mon Sep 17 00:00:00 2001 From: shannonwells Date: Wed, 1 Apr 2020 15:37:26 -0700 Subject: [PATCH 3/3] use ipld.Node and not selector.Selector as param --- retrievalmarket/impl/blockio/reader.go | 4 ++-- retrievalmarket/impl/blockio/traverser.go | 2 +- retrievalmarket/impl/common.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/retrievalmarket/impl/blockio/reader.go b/retrievalmarket/impl/blockio/reader.go index 40d179c6..5ffbd2e5 100644 --- a/retrievalmarket/impl/blockio/reader.go +++ b/retrievalmarket/impl/blockio/reader.go @@ -22,7 +22,7 @@ type BlockReader interface { // allowing the next block to be read and then advancing no further type SelectorBlockReader struct { root ipld.Link - sel ipld.Node + selector ipld.Node loader ipld.Loader traverser *Traverser } @@ -37,7 +37,7 @@ func NewSelectorBlockReader(root ipld.Link, sel ipld.Node, loader ipld.Loader) B func (sr *SelectorBlockReader) ReadBlock(ctx context.Context) (retrievalmarket.Block, bool, error) { if sr.traverser == nil { - sr.traverser = NewTraverser(sr.root, sr.sel) + sr.traverser = NewTraverser(sr.root, sr.selector) sr.traverser.Start(ctx) } lnk, lnkCtx := sr.traverser.CurrentRequest(ctx) diff --git a/retrievalmarket/impl/blockio/traverser.go b/retrievalmarket/impl/blockio/traverser.go index 38288c0c..aaf85239 100644 --- a/retrievalmarket/impl/blockio/traverser.go +++ b/retrievalmarket/impl/blockio/traverser.go @@ -101,7 +101,7 @@ func (t *Traverser) Start(ctx context.Context) { sel, err := selector.ParseSelector(t.selector) if err != nil { - t.Error(ctx, err) + t.writeDone(ctx) return } _ = traversal.Progress{ diff --git a/retrievalmarket/impl/common.go b/retrievalmarket/impl/common.go index 0048663a..b1ce3f65 100644 --- a/retrievalmarket/impl/common.go +++ b/retrievalmarket/impl/common.go @@ -12,4 +12,4 @@ func allSelector() ipld.Node { return ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())). Node() -} \ No newline at end of file +}