Skip to content

Commit

Permalink
feat(retrieval): add node implementations
Browse files Browse the repository at this point in the history
add node adapters for client & provider so that retrieval can be extracted
  • Loading branch information
hannahhoward committed Dec 19, 2019
1 parent 369a30d commit 8cd06a5
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 28 deletions.
3 changes: 1 addition & 2 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/filecoin-project/lotus/peermgr"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/retrieval/discovery"
retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
Expand Down Expand Up @@ -247,7 +246,7 @@ func Online() Option {
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
Override(new(dtypes.StagingDAG), modules.StagingDAG),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync),
Override(new(retrievalmarket.RetrievalProvider), retrievalimpl.NewProvider),
Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider),
Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore),
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator),
Expand Down
4 changes: 3 additions & 1 deletion node/modules/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package modules

import (
"context"
"github.com/filecoin-project/lotus/retrievaladapter"
"path/filepath"
"reflect"

Expand Down Expand Up @@ -104,5 +105,6 @@ func ClientGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.Client

// RetrievalClient creates a new retrieval client attached to the client blockstore
func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClient {
return retrievalimpl.NewClient(h, bs, pmgr, payapi)
adapter := retrievaladapter.NewRetrievalClientNode(pmgr, payapi)
return retrievalimpl.NewClient(h, bs, adapter)
}
9 changes: 9 additions & 0 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ import (
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl"
"github.com/filecoin-project/lotus/retrievaladapter"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)

func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
Expand Down Expand Up @@ -259,3 +262,9 @@ func SealTicketGen(api api.FullNode) storage.TicketFn {
}, nil
}
}

// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider {
adapter := retrievaladapter.NewRetrievalProviderNode(full)
return retrievalimpl.NewProvider(sblks, adapter)
}
34 changes: 14 additions & 20 deletions retrieval/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,27 @@ import (

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/cborutil"
payapi "github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/paych"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
)

var log = logging.Logger("retrieval")

type client struct {
h host.Host
bs blockstore.Blockstore

h host.Host
bs blockstore.Blockstore
node retrievalmarket.RetrievalClientNode
// The parameters should be replaced by RetrievalClientNode
// for https://github.com/filecoin-project/go-retrieval-market-project/issues/3
pmgr *paych.Manager
payapi payapi.PaychAPI

nextDealLk sync.Mutex
nextDealID retrievalmarket.DealID
subscribers []retrievalmarket.ClientSubscriber
}

// NewClient creates a new retrieval client
func NewClient(h host.Host, bs blockstore.Blockstore, pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClient {
return &client{h: h, bs: bs, pmgr: pmgr, payapi: payapi}
func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
return &client{h: h, bs: bs, node: node}
}

// V0
Expand Down Expand Up @@ -94,7 +88,7 @@ func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pie

// TODO: Update to match spec for V0 Epic:
// https://github.com/filecoin-project/go-retrieval-market-project/issues/9
func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrievalmarket.Params, totalFunds retrievalmarket.BigInt, miner peer.ID, clientWallet retrievalmarket.Address, minerWallet retrievalmarket.Address) retrievalmarket.DealID {
func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrievalmarket.Params, totalFunds types.BigInt, miner peer.ID, clientWallet retrievalmarket.Address, minerWallet retrievalmarket.Address) retrievalmarket.DealID {
/* The implementation of this function is just wrapper for the old code which retrieves UnixFS pieces
-- it will be replaced when we do the V0 implementation of the module */
c.nextDealLk.Lock()
Expand Down Expand Up @@ -158,7 +152,7 @@ func (c *client) SubscribeToEvents(subscriber retrievalmarket.ClientSubscriber)
}

// V1
func (c *client) AddMoreFunds(id retrievalmarket.DealID, amount retrievalmarket.BigInt) error {
func (c *client) AddMoreFunds(id retrievalmarket.DealID, amount types.BigInt) error {
panic("not implemented")
}

Expand All @@ -175,15 +169,15 @@ func (c *client) ListDeals() map[retrievalmarket.DealID]retrievalmarket.ClientDe
}

type clientStream struct {
payapi payapi.PaychAPI
node retrievalmarket.RetrievalClientNode
stream network.Stream
peeker cbg.BytePeeker

root cid.Cid
size types.BigInt
offset uint64

paych address.Address
paych retrievalmarket.Address
lane uint64
total types.BigInt
transferred types.BigInt
Expand All @@ -207,7 +201,7 @@ type clientStream struct {
// < ..Blocks
// > DealProposal(...)
// < ...
func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr address.Address) error {
func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr retrievalmarket.Address) error {
s, err := c.h.NewStream(ctx, miner, retrievalmarket.ProtocolID)
if err != nil {
return err
Expand All @@ -218,17 +212,17 @@ func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
// TODO: Support in handler
// TODO: Allow client to specify this

paych, _, err := c.pmgr.GetPaych(ctx, client, minerAddr, total)
paych, err := c.node.GetOrCreatePaymentChannel(ctx, client, minerAddr, total)
if err != nil {
return xerrors.Errorf("getting payment channel: %w", err)
}
lane, err := c.pmgr.AllocateLane(paych)
lane, err := c.node.AllocateLane(paych)
if err != nil {
return xerrors.Errorf("allocating payment lane: %w", err)
}

cst := clientStream{
payapi: c.payapi,
node: c.node,
stream: s,
peeker: cbg.GetPeeker(s),

Expand Down Expand Up @@ -372,7 +366,7 @@ func (cst *clientStream) consumeBlockMessage(block Block) (uint64, error) {
func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) {
amount := types.BigAdd(cst.transferred, toSend)

sv, err := cst.payapi.PaychVoucherCreate(ctx, cst.paych, amount, cst.lane)
sv, err := cst.node.CreatePaymentVoucher(ctx, cst.paych, amount, cst.lane)
if err != nil {
return api.PaymentInfo{}, err
}
Expand Down
9 changes: 4 additions & 5 deletions retrieval/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
Expand All @@ -34,18 +33,18 @@ type provider struct {

// TODO: Replace with RetrievalProviderNode for
// https://github.com/filecoin-project/go-retrieval-market-project/issues/4
full RetrMinerAPI
node retrievalmarket.RetrievalProviderNode

pricePerByte retrievalmarket.BigInt

subscribers []retrievalmarket.ProviderSubscriber
}

// NewProvider returns a new retrieval provider
func NewProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider {
func NewProvider(sblks *sectorblocks.SectorBlocks, node retrievalmarket.RetrievalProviderNode) retrievalmarket.RetrievalProvider {
return &provider{
sectorBlocks: sblks,
full: full,
node: node,

pricePerByte: types.NewInt(2), // TODO: allow setting
}
Expand Down Expand Up @@ -202,7 +201,7 @@ func (hnd *handlerDeal) handleNext() (bool, error) {
}

expPayment := types.BigMul(hnd.p.pricePerByte, types.NewInt(deal.Params.Unixfs0.Size))
if _, err := hnd.p.full.PaychVoucherAdd(context.TODO(), deal.Payment.Channel, deal.Payment.Vouchers[0], nil, expPayment); err != nil {
if _, err := hnd.p.node.SavePaymentVoucher(context.TODO(), deal.Payment.Channel, deal.Payment.Vouchers[0], nil, expPayment); err != nil {
return false, xerrors.Errorf("processing retrieval payment: %w", err)
}

Expand Down
41 changes: 41 additions & 0 deletions retrievaladapter/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package retrievaladapter

import (
"context"

payapi "github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/paych"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
)

type retrievalClientNode struct {
pmgr *paych.Manager
payapi payapi.PaychAPI
}

// NewRetrievalClientNode returns a new node adapter for a retrieval client that talks to the
// Lotus Node
func NewRetrievalClientNode(pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClientNode {
return &retrievalClientNode{pmgr: pmgr, payapi: payapi}
}

// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist
// between a client and a miner and insures the client has the given amount of funds available in the channel
func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress retrievalmarket.Address, minerAddress retrievalmarket.Address, clientFundsAvailable retrievalmarket.BigInt) (retrievalmarket.Address, error) {
paych, _, err := rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable)
return paych, err
}

// Allocate late creates a lane within a payment channel so that calls to
// CreatePaymentVoucher will automatically make vouchers only for the difference
// in total
func (rcn *retrievalClientNode) AllocateLane(paymentChannel retrievalmarket.Address) (uint64, error) {
return rcn.pmgr.AllocateLane(paymentChannel)
}

// CreatePaymentVoucher creates a new payment voucher in the given lane for a
// given payment channel so that all the payment vouchers in the lane add up
// to the given amount (so the payment voucher will be for the difference)
func (rcn *retrievalClientNode) CreatePaymentVoucher(ctx context.Context, paymentChannel retrievalmarket.Address, amount retrievalmarket.BigInt, lane uint64) (*retrievalmarket.SignedVoucher, error) {
return rcn.payapi.PaychVoucherCreate(ctx, paymentChannel, amount, lane)
}
23 changes: 23 additions & 0 deletions retrievaladapter/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package retrievaladapter

import (
"context"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/address"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
)

type retrievalProviderNode struct {
full api.FullNode
}

// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
// Lotus Node
func NewRetrievalProviderNode(full api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{full}
}

func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *retrievalmarket.SignedVoucher, proof []byte, expectedAmount retrievalmarket.BigInt) (retrievalmarket.BigInt, error) {
return rpn.full.PaychVoucherAdd(ctx, paymentChannel, voucher, proof, expectedAmount)
}

0 comments on commit 8cd06a5

Please sign in to comment.