diff --git a/node/builder.go b/node/builder.go index 641bdffddeb..20e7fe081ad 100644 --- a/node/builder.go +++ b/node/builder.go @@ -45,7 +45,6 @@ import ( "github.com/filecoin-project/lotus/peermgr" retrievalmarket "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval/discovery" - retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" ) @@ -244,7 +243,7 @@ func Online() Option { Override(new(dtypes.StagingDAG), modules.StagingDAG), - Override(new(retrievalmarket.RetrievalProvider), retrievalimpl.NewProvider), + Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider), Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore), Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer), Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator), diff --git a/node/modules/client.go b/node/modules/client.go index 102e9572af0..f5a9dc4e758 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -2,6 +2,7 @@ package modules import ( "context" + "github.com/filecoin-project/lotus/retrievaladapter" "path/filepath" "reflect" @@ -87,5 +88,6 @@ func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlocks // RetrievalClient creates a new retrieval client attached to the client blockstore func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClient { - return retrievalimpl.NewClient(h, bs, pmgr, payapi) + adapter := retrievaladapter.NewRetrievalClientNode(pmgr, payapi) + return retrievalimpl.NewClient(h, bs, adapter) } diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index a23ad584ed1..8cdc2967e1d 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -31,7 +31,10 @@ import ( "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" retrievalmarket "github.com/filecoin-project/lotus/retrieval" + retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl" + "github.com/filecoin-project/lotus/retrievaladapter" "github.com/filecoin-project/lotus/storage" + "github.com/filecoin-project/lotus/storage/sectorblocks" ) func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { @@ -235,3 +238,9 @@ func SealTicketGen(api api.FullNode) storage.TicketFn { }, nil } } + +// RetrievalProvider creates a new retrieval provider attached to the provider blockstore +func RetrievalProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider { + adapter := retrievaladapter.NewRetrievalProviderNode(full) + return retrievalimpl.NewProvider(sblks, adapter) +} diff --git a/retrieval/impl/client.go b/retrieval/impl/client.go index 78447b0e5b1..a53fde8aeb2 100644 --- a/retrieval/impl/client.go +++ b/retrieval/impl/client.go @@ -17,24 +17,18 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/cborutil" - payapi "github.com/filecoin-project/lotus/node/impl/paych" - "github.com/filecoin-project/lotus/paych" retrievalmarket "github.com/filecoin-project/lotus/retrieval" ) var log = logging.Logger("retrieval") type client struct { - h host.Host - bs blockstore.Blockstore - + h host.Host + bs blockstore.Blockstore + node retrievalmarket.RetrievalClientNode // The parameters should be replaced by RetrievalClientNode - // for https://github.com/filecoin-project/go-retrieval-market-project/issues/3 - pmgr *paych.Manager - payapi payapi.PaychAPI nextDealLk sync.Mutex nextDealID retrievalmarket.DealID @@ -42,8 +36,8 @@ type client struct { } // NewClient creates a new retrieval client -func NewClient(h host.Host, bs blockstore.Blockstore, pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClient { - return &client{h: h, bs: bs, pmgr: pmgr, payapi: payapi} +func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient { + return &client{h: h, bs: bs, node: node} } // V0 @@ -94,7 +88,7 @@ func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pie // TODO: Update to match spec for V0 Epic: // https://github.com/filecoin-project/go-retrieval-market-project/issues/9 -func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrievalmarket.Params, totalFunds retrievalmarket.BigInt, miner peer.ID, clientWallet retrievalmarket.Address, minerWallet retrievalmarket.Address) retrievalmarket.DealID { +func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrievalmarket.Params, totalFunds types.BigInt, miner peer.ID, clientWallet retrievalmarket.Address, minerWallet retrievalmarket.Address) retrievalmarket.DealID { /* The implementation of this function is just wrapper for the old code which retrieves UnixFS pieces -- it will be replaced when we do the V0 implementation of the module */ c.nextDealLk.Lock() @@ -158,7 +152,7 @@ func (c *client) SubscribeToEvents(subscriber retrievalmarket.ClientSubscriber) } // V1 -func (c *client) AddMoreFunds(id retrievalmarket.DealID, amount retrievalmarket.BigInt) error { +func (c *client) AddMoreFunds(id retrievalmarket.DealID, amount types.BigInt) error { panic("not implemented") } @@ -175,7 +169,7 @@ func (c *client) ListDeals() map[retrievalmarket.DealID]retrievalmarket.ClientDe } type clientStream struct { - payapi payapi.PaychAPI + node retrievalmarket.RetrievalClientNode stream network.Stream peeker cbg.BytePeeker @@ -183,7 +177,7 @@ type clientStream struct { size types.BigInt offset uint64 - paych address.Address + paych retrievalmarket.Address lane uint64 total types.BigInt transferred types.BigInt @@ -207,7 +201,7 @@ type clientStream struct { // < ..Blocks // > DealProposal(...) // < ... -func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr address.Address) error { +func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr retrievalmarket.Address) error { s, err := c.h.NewStream(ctx, miner, retrievalmarket.ProtocolID) if err != nil { return err @@ -218,17 +212,17 @@ func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, // TODO: Support in handler // TODO: Allow client to specify this - paych, _, err := c.pmgr.GetPaych(ctx, client, minerAddr, total) + paych, err := c.node.GetOrCreatePaymentChannel(ctx, client, minerAddr, total) if err != nil { return xerrors.Errorf("getting payment channel: %w", err) } - lane, err := c.pmgr.AllocateLane(paych) + lane, err := c.node.AllocateLane(paych) if err != nil { return xerrors.Errorf("allocating payment lane: %w", err) } cst := clientStream{ - payapi: c.payapi, + node: c.node, stream: s, peeker: cbg.GetPeeker(s), @@ -372,7 +366,7 @@ func (cst *clientStream) consumeBlockMessage(block Block) (uint64, error) { func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) { amount := types.BigAdd(cst.transferred, toSend) - sv, err := cst.payapi.PaychVoucherCreate(ctx, cst.paych, amount, cst.lane) + sv, err := cst.node.CreatePaymentVoucher(ctx, cst.paych, amount, cst.lane) if err != nil { return api.PaymentInfo{}, err } diff --git a/retrieval/impl/provider.go b/retrieval/impl/provider.go index dd3c28cb383..c4619b84724 100644 --- a/retrieval/impl/provider.go +++ b/retrieval/impl/provider.go @@ -13,7 +13,6 @@ import ( "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/types" @@ -34,7 +33,7 @@ type provider struct { // TODO: Replace with RetrievalProviderNode for // https://github.com/filecoin-project/go-retrieval-market-project/issues/4 - full RetrMinerAPI + node retrievalmarket.RetrievalProviderNode pricePerByte retrievalmarket.BigInt @@ -42,10 +41,10 @@ type provider struct { } // NewProvider returns a new retrieval provider -func NewProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider { +func NewProvider(sblks *sectorblocks.SectorBlocks, node retrievalmarket.RetrievalProviderNode) retrievalmarket.RetrievalProvider { return &provider{ sectorBlocks: sblks, - full: full, + node: node, pricePerByte: types.NewInt(2), // TODO: allow setting } @@ -202,7 +201,7 @@ func (hnd *handlerDeal) handleNext() (bool, error) { } expPayment := types.BigMul(hnd.p.pricePerByte, types.NewInt(deal.Params.Unixfs0.Size)) - if _, err := hnd.p.full.PaychVoucherAdd(context.TODO(), deal.Payment.Channel, deal.Payment.Vouchers[0], nil, expPayment); err != nil { + if _, err := hnd.p.node.SavePaymentVoucher(context.TODO(), deal.Payment.Channel, deal.Payment.Vouchers[0], nil, expPayment); err != nil { return false, xerrors.Errorf("processing retrieval payment: %w", err) } diff --git a/retrievaladapter/client.go b/retrievaladapter/client.go new file mode 100644 index 00000000000..ff7d2f0fb7c --- /dev/null +++ b/retrievaladapter/client.go @@ -0,0 +1,41 @@ +package retrievaladapter + +import ( + "context" + + payapi "github.com/filecoin-project/lotus/node/impl/paych" + "github.com/filecoin-project/lotus/paych" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" +) + +type retrievalClientNode struct { + pmgr *paych.Manager + payapi payapi.PaychAPI +} + +// NewRetrievalClientNode returns a new node adapter for a retrieval client that talks to the +// Lotus Node +func NewRetrievalClientNode(pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClientNode { + return &retrievalClientNode{pmgr: pmgr, payapi: payapi} +} + +// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist +// between a client and a miner and insures the client has the given amount of funds available in the channel +func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress retrievalmarket.Address, minerAddress retrievalmarket.Address, clientFundsAvailable retrievalmarket.BigInt) (retrievalmarket.Address, error) { + paych, _, err := rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable) + return paych, err +} + +// Allocate late creates a lane within a payment channel so that calls to +// CreatePaymentVoucher will automatically make vouchers only for the difference +// in total +func (rcn *retrievalClientNode) AllocateLane(paymentChannel retrievalmarket.Address) (uint64, error) { + return rcn.pmgr.AllocateLane(paymentChannel) +} + +// CreatePaymentVoucher creates a new payment voucher in the given lane for a +// given payment channel so that all the payment vouchers in the lane add up +// to the given amount (so the payment voucher will be for the difference) +func (rcn *retrievalClientNode) CreatePaymentVoucher(ctx context.Context, paymentChannel retrievalmarket.Address, amount retrievalmarket.BigInt, lane uint64) (*retrievalmarket.SignedVoucher, error) { + return rcn.payapi.PaychVoucherCreate(ctx, paymentChannel, amount, lane) +} diff --git a/retrievaladapter/provider.go b/retrievaladapter/provider.go new file mode 100644 index 00000000000..64da28cace5 --- /dev/null +++ b/retrievaladapter/provider.go @@ -0,0 +1,23 @@ +package retrievaladapter + +import ( + "context" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/address" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" +) + +type retrievalProviderNode struct { + full api.FullNode +} + +// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the +// Lotus Node +func NewRetrievalProviderNode(full api.FullNode) retrievalmarket.RetrievalProviderNode { + return &retrievalProviderNode{full} +} + +func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *retrievalmarket.SignedVoucher, proof []byte, expectedAmount retrievalmarket.BigInt) (retrievalmarket.BigInt, error) { + return rpn.full.PaychVoucherAdd(ctx, paymentChannel, voucher, proof, expectedAmount) +}