Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow custom decisioning for a provider to decide retrieval deals. #269

Merged
merged 6 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 52 additions & 24 deletions retrievalmarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (
"github.com/filecoin-project/go-fil-markets/shared"
)

type provider struct {
type RetrievalProviderOption func(p *Provider)
type DealDecider func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error)

type Provider struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to make this a public struct? I just want to make sure it's neccessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's similar to StorageMarket -- if you make it private you can't operate on (p Provider) to set a config option. StorageProviderOption and RetrievalProviderOption both take a (Storage|Retrieval)Provider.

bs blockstore.Blockstore
node retrievalmarket.RetrievalProviderNode
network rmnet.RetrievalMarketNetwork
Expand All @@ -40,9 +43,11 @@ type provider struct {
dealStreams map[retrievalmarket.ProviderDealIdentifier]rmnet.RetrievalDealStream
blockReaders map[retrievalmarket.ProviderDealIdentifier]blockio.BlockReader
stateMachines fsm.Group
dealDecider DealDecider
}

var _ retrievalmarket.RetrievalProvider = &provider{}
var _ retrievalmarket.RetrievalProvider = new(Provider)
var _ providerstates.ProviderDealEnvironment = new(Provider)

// DefaultPricePerByte is the charge per byte retrieved if the miner does
// not specifically set it
Expand All @@ -56,10 +61,13 @@ var DefaultPaymentInterval = uint64(1 << 20)
// set to to 1Mb if the miner does not explicitly set it otherwise
var DefaultPaymentIntervalIncrease = uint64(1 << 20)

// NewProvider returns a new retrieval provider
func NewProvider(minerAddress address.Address, node retrievalmarket.RetrievalProviderNode, network rmnet.RetrievalMarketNetwork, pieceStore piecestore.PieceStore, bs blockstore.Blockstore, ds datastore.Batching) (retrievalmarket.RetrievalProvider, error) {
// NewProvider returns a new retrieval Provider
func NewProvider(minerAddress address.Address, node retrievalmarket.RetrievalProviderNode,
network rmnet.RetrievalMarketNetwork, pieceStore piecestore.PieceStore,
bs blockstore.Blockstore, ds datastore.Batching, opts ...RetrievalProviderOption,
) (retrievalmarket.RetrievalProvider, error) {

p := &provider{
p := &Provider{
bs: bs,
node: node,
network: network,
Expand All @@ -79,40 +87,48 @@ func NewProvider(minerAddress address.Address, node retrievalmarket.RetrievalPro
StateEntryFuncs: providerstates.ProviderStateEntryFuncs,
Notifier: p.notifySubscribers,
})
p.stateMachines = statemachines
if err != nil {
return nil, err
}
p.Configure(opts...)
p.stateMachines = statemachines
return p, nil
}

func (p *Provider) RunDealDecisioningLogic(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) {
if p.dealDecider == nil {
return true, "", nil
}
return p.dealDecider(ctx, state)
}

// Stop stops handling incoming requests
func (p *provider) Stop() error {
func (p *Provider) Stop() error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this had to be exported similarly to storagemarket, so a decider func is possible

return p.network.StopHandlingRequests()
}

// Start begins listening for deals on the given host
func (p *provider) Start() error {
func (p *Provider) Start() error {
return p.network.SetDelegate(p)
}

// V0
// SetPricePerByte sets the price per byte a miner charges for retrievals
func (p *provider) SetPricePerByte(price abi.TokenAmount) {
func (p *Provider) SetPricePerByte(price abi.TokenAmount) {
p.pricePerByte = price
}

// SetPaymentInterval sets the maximum number of bytes a a provider will send before
// SetPaymentInterval sets the maximum number of bytes a a Provider will send before
// requesting further payment, and the rate at which that value increases
func (p *provider) SetPaymentInterval(paymentInterval uint64, paymentIntervalIncrease uint64) {
func (p *Provider) SetPaymentInterval(paymentInterval uint64, paymentIntervalIncrease uint64) {
p.paymentInterval = paymentInterval
p.paymentIntervalIncrease = paymentIntervalIncrease
}

// unsubscribeAt returns a function that removes an item from the subscribers list by comparing
// their reflect.ValueOf before pulling the item out of the slice. Does not preserve order.
// Subsequent, repeated calls to the func with the same Subscriber are a no-op.
func (p *provider) unsubscribeAt(sub retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe {
func (p *Provider) unsubscribeAt(sub retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe {
return func() {
p.subscribersLk.Lock()
defer p.subscribersLk.Unlock()
Expand All @@ -127,7 +143,7 @@ func (p *provider) unsubscribeAt(sub retrievalmarket.ProviderSubscriber) retriev
}
}

func (p *provider) notifySubscribers(eventName fsm.EventName, state fsm.StateType) {
func (p *Provider) notifySubscribers(eventName fsm.EventName, state fsm.StateType) {
p.subscribersLk.RLock()
defer p.subscribersLk.RUnlock()
evt := eventName.(retrievalmarket.ProviderEvent)
Expand All @@ -138,7 +154,7 @@ func (p *provider) notifySubscribers(eventName fsm.EventName, state fsm.StateTyp
}

// SubscribeToEvents listens for events that happen related to client retrievals
func (p *provider) SubscribeToEvents(subscriber retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe {
func (p *Provider) SubscribeToEvents(subscriber retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe {
p.subscribersLk.Lock()
p.subscribers = append(p.subscribers, subscriber)
p.subscribersLk.Unlock()
Expand All @@ -147,15 +163,15 @@ func (p *provider) SubscribeToEvents(subscriber retrievalmarket.ProviderSubscrib
}

// V1
func (p *provider) SetPricePerUnseal(price abi.TokenAmount) {
func (p *Provider) SetPricePerUnseal(price abi.TokenAmount) {
panic("not implemented")
}

func (p *provider) ListDeals() map[retrievalmarket.ProviderDealID]retrievalmarket.ProviderDealState {
func (p *Provider) ListDeals() map[retrievalmarket.ProviderDealID]retrievalmarket.ProviderDealState {
panic("not implemented")
}

func (p *provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
defer stream.Close()
query, err := stream.ReadQuery()
if err != nil {
Expand Down Expand Up @@ -212,7 +228,7 @@ func (p *provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
}
}

func (p *provider) HandleDealStream(stream rmnet.RetrievalDealStream) {
func (p *Provider) HandleDealStream(stream rmnet.RetrievalDealStream) {
// read deal proposal (or fail)
err := p.newProviderDeal(stream)
if err != nil {
Expand All @@ -221,7 +237,7 @@ func (p *provider) HandleDealStream(stream rmnet.RetrievalDealStream) {
}
}

func (p *provider) newProviderDeal(stream rmnet.RetrievalDealStream) error {
func (p *Provider) newProviderDeal(stream rmnet.RetrievalDealStream) error {
dealProposal, err := stream.ReadDealProposal()
if err != nil {
return err
Expand Down Expand Up @@ -264,15 +280,15 @@ func (p *provider) newProviderDeal(stream rmnet.RetrievalDealStream) error {
return nil
}

func (p *provider) Node() retrievalmarket.RetrievalProviderNode {
func (p *Provider) Node() retrievalmarket.RetrievalProviderNode {
return p.node
}

func (p *provider) DealStream(id retrievalmarket.ProviderDealIdentifier) rmnet.RetrievalDealStream {
func (p *Provider) DealStream(id retrievalmarket.ProviderDealIdentifier) rmnet.RetrievalDealStream {
return p.dealStreams[id]
}

func (p *provider) CheckDealParams(pricePerByte abi.TokenAmount, paymentInterval uint64, paymentIntervalIncrease uint64) error {
func (p *Provider) CheckDealParams(pricePerByte abi.TokenAmount, paymentInterval uint64, paymentIntervalIncrease uint64) error {
if pricePerByte.LessThan(p.pricePerByte) {
return errors.New("Price per byte too low")
}
Expand All @@ -285,15 +301,15 @@ func (p *provider) CheckDealParams(pricePerByte abi.TokenAmount, paymentInterval
return nil
}

func (p *provider) NextBlock(ctx context.Context, id retrievalmarket.ProviderDealIdentifier) (retrievalmarket.Block, bool, error) {
func (p *Provider) NextBlock(ctx context.Context, id retrievalmarket.ProviderDealIdentifier) (retrievalmarket.Block, bool, error) {
br, ok := p.blockReaders[id]
if !ok {
return retrievalmarket.Block{}, false, errors.New("Could not read block")
}
return br.ReadBlock(ctx)
}

func (p *provider) GetPieceSize(c cid.Cid) (uint64, error) {
func (p *Provider) GetPieceSize(c cid.Cid) (uint64, error) {
pieceInfo, err := getPieceInfoFromCid(p.pieceStore, c, cid.Undef)
if err != nil {
return 0, err
Expand All @@ -304,6 +320,18 @@ func (p *provider) GetPieceSize(c cid.Cid) (uint64, error) {
return pieceInfo.Deals[0].Length, nil
}

func (p *Provider) Configure(opts ...RetrievalProviderOption) {
for _, opt := range opts {
opt(p)
}
}

func DealDeciderOpt(dd DealDecider) RetrievalProviderOption {
return func(provider *Provider) {
provider.dealDecider = dd
}
}

func getPieceInfoFromCid(pieceStore piecestore.PieceStore, payloadCID, pieceCID cid.Cid) (piecestore.PieceInfo, error) {
cidInfo, err := pieceStore.GetCIDInfo(payloadCID)
if err != nil {
Expand Down
36 changes: 36 additions & 0 deletions retrievalmarket/impl/provider_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package retrievalimpl_test

import (
"context"
"testing"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
spect "github.com/filecoin-project/specs-actors/support/testing"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
Expand Down Expand Up @@ -214,6 +216,40 @@ func TestHandleQueryStream(t *testing.T) {

}

func TestProviderConfigOpts(t *testing.T) {
var sawOpt int
opt1 := func(p *retrievalimpl.Provider) { sawOpt++ }
opt2 := func(p *retrievalimpl.Provider) { sawOpt += 2 }
ds := datastore.NewMapDatastore()
bs := bstore.NewBlockstore(ds)
p, err := retrievalimpl.NewProvider(
spect.NewIDAddr(t, 2344),
testnodes.NewTestRetrievalProviderNode(),
tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}),
tut.NewTestPieceStore(),
bs, ds, opt1, opt2,
)
require.NoError(t, err)
assert.NotNil(t, p)
assert.Equal(t, 3, sawOpt)

// just test that we can create a DealDeciderOpt function and that it runs
// successfully in the constructor
ddOpt := retrievalimpl.DealDeciderOpt(
func(_ context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) {
return true, "yes", nil
})

p, err = retrievalimpl.NewProvider(
spect.NewIDAddr(t, 2344),
testnodes.NewTestRetrievalProviderNode(),
tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}),
tut.NewTestPieceStore(),
bs, ds, ddOpt)
require.NoError(t, err)
require.NotNil(t, p)
}

// loadPieceCIDS sets expectations to receive expectedPieceCID and 3 other random PieceCIDs to
// disinguish the case of a PayloadCID is found but the PieceCID is not
func loadPieceCIDS(t *testing.T, pieceStore *tut.TestPieceStore, expPayloadCID, expectedPieceCID cid.Cid) {
Expand Down
10 changes: 8 additions & 2 deletions retrievalmarket/impl/providerstates/provider_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ var ProviderEvents = fsm.Events{
return nil
},
),
fsm.Event(rm.ProviderEventDealReceived).
From(rm.DealStatusNew).To(rm.DealStatusAwaitingAcceptance),
fsm.Event(rm.ProviderEventWriteResponseFailed).
FromAny().To(rm.DealStatusErrored).
Action(func(deal *rm.ProviderDealState, err error) error {
deal.Message = xerrors.Errorf("writing deal response: %w", err).Error()
return nil
}),
fsm.Event(rm.ProviderEventDecisioningError).
From(rm.DealStatusAwaitingAcceptance).To(rm.DealStatusErrored).
Action(recordError),
fsm.Event(rm.ProviderEventReadPaymentFailed).
FromAny().To(rm.DealStatusErrored).
Action(recordError),
Expand All @@ -46,10 +51,10 @@ var ProviderEvents = fsm.Events{
return nil
}),
fsm.Event(rm.ProviderEventDealRejected).
From(rm.DealStatusNew).To(rm.DealStatusRejected).
FromMany(rm.DealStatusNew, rm.DealStatusAwaitingAcceptance).To(rm.DealStatusRejected).
Action(recordError),
fsm.Event(rm.ProviderEventDealAccepted).
From(rm.DealStatusNew).To(rm.DealStatusAccepted).
From(rm.DealStatusAwaitingAcceptance).To(rm.DealStatusAccepted).
Action(func(deal *rm.ProviderDealState, dealProposal rm.DealProposal) error {
deal.DealProposal = dealProposal
deal.CurrentInterval = deal.PaymentInterval
Expand Down Expand Up @@ -96,6 +101,7 @@ var ProviderStateEntryFuncs = fsm.StateEntryFuncs{
rm.DealStatusRejected: SendFailResponse,
rm.DealStatusDealNotFound: SendFailResponse,
rm.DealStatusOngoing: SendBlocks,
rm.DealStatusAwaitingAcceptance: DecideOnDeal,
rm.DealStatusAccepted: SendBlocks,
rm.DealStatusFundsNeeded: ProcessPayment,
rm.DealStatusFundsNeededLastPayment: ProcessPayment,
Expand Down
24 changes: 19 additions & 5 deletions retrievalmarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package providerstates

import (
"context"
"errors"

"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/specs-actors/actors/abi"
Expand All @@ -20,6 +21,7 @@ type ProviderDealEnvironment interface {
DealStream(id rm.ProviderDealIdentifier) rmnet.RetrievalDealStream
NextBlock(context.Context, rm.ProviderDealIdentifier) (rm.Block, bool, error)
CheckDealParams(pricePerByte abi.TokenAmount, paymentInterval uint64, paymentIntervalIncrease uint64) error
RunDealDecisioningLogic(ctx context.Context, state rm.ProviderDealState) (bool, string, error)
}

// ReceiveDeal receives and evaluates a deal proposal
Expand All @@ -36,21 +38,33 @@ func ReceiveDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal rm.P
}

// check that the deal parameters match our required parameters (or reject)
err = environment.CheckDealParams(dealProposal.PricePerByte, dealProposal.PaymentInterval, dealProposal.PaymentIntervalIncrease)
err = environment.CheckDealParams(dealProposal.PricePerByte,
Copy link
Contributor Author

@shannonwells shannonwells Jun 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was conflicted here between moving all decisioning to DecideOnDeal and leaving just the param check here. I decided an up front parameter check is okay here before the "meat" of decisioning happens, but I can be convinced otherwise.

dealProposal.PaymentInterval,
dealProposal.PaymentIntervalIncrease)
if err != nil {
return ctx.Trigger(rm.ProviderEventDealRejected, err)
}
return ctx.Trigger(rm.ProviderEventDealReceived)
}

err = environment.DealStream(deal.Identifier()).WriteDealResponse(rm.DealResponse{
// DecideOnDeal
func DecideOnDeal(ctx fsm.Context, env ProviderDealEnvironment, state rm.ProviderDealState) error {
accepted, reason, err := env.RunDealDecisioningLogic(ctx.Context(), state)
if err != nil {
return ctx.Trigger(rm.ProviderEventDecisioningError, err)
}
if !accepted {
return ctx.Trigger(rm.ProviderEventDealRejected, errors.New(reason))
}
err = env.DealStream(state.Identifier()).WriteDealResponse(rm.DealResponse{
Status: rm.DealStatusAccepted,
ID: deal.ID,
ID: state.ID,
})
if err != nil {
return ctx.Trigger(rm.ProviderEventWriteResponseFailed, err)
}

return ctx.Trigger(rm.ProviderEventDealAccepted, dealProposal)

return ctx.Trigger(rm.ProviderEventDealAccepted, state.DealProposal)
}

// SendBlocks sends blocks to the client until funds are needed
Expand Down
Loading