diff --git a/docs/retrievalprovider.mmd b/docs/retrievalprovider.mmd index 9ac9149e..40a6d8fe 100644 --- a/docs/retrievalprovider.mmd +++ b/docs/retrievalprovider.mmd @@ -22,6 +22,7 @@ stateDiagram-v2 The following events are not shown cause they can trigger from any state. ProviderEventDataTransferError - transitions state to DealStatusErrored + ProviderEventMultiStoreError - transitions state to DealStatusErrored end note 0 --> 0 : ProviderEventOpen 0 --> 1 : ProviderEventDealAccepted diff --git a/docs/retrievalprovider.mmd.png b/docs/retrievalprovider.mmd.png index ebd2f0ad..b0ef150f 100644 Binary files a/docs/retrievalprovider.mmd.png and b/docs/retrievalprovider.mmd.png differ diff --git a/docs/retrievalprovider.mmd.svg b/docs/retrievalprovider.mmd.svg index 6efd11a8..b8ab7567 100644 --- a/docs/retrievalprovider.mmd.svg +++ b/docs/retrievalprovider.mmd.svg @@ -1,6 +1,6 @@ -ProviderEventOpenProviderEventDealAcceptedProviderEventDealAcceptedProviderEventUnsealErrorProviderEventUnsealCompleteProviderEventBlockSentProviderEventBlockSentProviderEventBlocksCompletedProviderEventPaymentRequestedProviderEventPaymentRequestedProviderEventPaymentRequestedProviderEventPaymentRequestedProviderEventSaveVoucherFailedProviderEventSaveVoucherFailedProviderEventPartialPaymentReceivedProviderEventPartialPaymentReceivedProviderEventPaymentReceivedProviderEventPaymentReceivedProviderEventPaymentReceivedProviderEventCompleteProviderEventCleanupCompleteProviderEventCancelCompleteDealStatusNewDealStatusUnsealingOn entry runs UnsealDataDealStatusUnsealedOn entry runs UnpauseDealDealStatusFundsNeededUnsealOn entry runs TrackTransferDealStatusFailedOn entry runs CancelDealDealStatusFundsNeededDealStatusOngoingDealStatusFundsNeededLastPaymentDealStatusCompletedDealStatusErroredDealStatusBlocksCompleteDealStatusFinalizingDealStatusCompletingOn entry runs CleanupDealThe following events are not shown cause they can trigger from any state.ProviderEventDataTransferError - transitions state to DealStatusErroredProviderEventMultiStoreError - transitions state to DealStatusErrored \ No newline at end of file diff --git a/retrievalmarket/client.go b/retrievalmarket/client.go index ef20c261..0a4bf9aa 100644 --- a/retrievalmarket/client.go +++ b/retrievalmarket/client.go @@ -7,6 +7,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/specs-actors/actors/abi" ) @@ -37,6 +38,7 @@ type RetrievalClient interface { miner peer.ID, clientWallet address.Address, minerWallet address.Address, + storeID *multistore.StoreID, ) (DealID, error) // SubscribeToEvents listens for events that happen related to client retrievals diff --git a/retrievalmarket/events.go b/retrievalmarket/events.go index 0750df6c..0142666c 100644 --- a/retrievalmarket/events.go +++ b/retrievalmarket/events.go @@ -185,6 +185,9 @@ const ( // ProviderEventCleanupComplete happens when a deal is finished cleaning up and enters a complete state ProviderEventCleanupComplete + + // ProviderEventMultiStoreError occurs when an error happens attempting to operate on the multistore + ProviderEventMultiStoreError ) // ProviderEvents is a human readable map of provider event name -> event description @@ -205,4 +208,5 @@ var ProviderEvents = map[ProviderEvent]string{ ProviderEventDataTransferError: "ProviderEventDataTransferError", ProviderEventCancelComplete: "ProviderEventCancelComplete", ProviderEventCleanupComplete: "ProviderEventCleanupComplete", + ProviderEventMultiStoreError: "ProviderEventMultiStoreError", } diff --git a/retrievalmarket/impl/client.go b/retrievalmarket/impl/client.go index 27e6702a..a47e3d51 100644 --- a/retrievalmarket/impl/client.go +++ b/retrievalmarket/impl/client.go @@ -7,13 +7,13 @@ import ( "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/go-storedcounter" "github.com/filecoin-project/specs-actors/actors/abi" @@ -32,7 +32,7 @@ var log = logging.Logger("retrieval") type Client struct { network rmnet.RetrievalMarketNetwork dataTransfer datatransfer.Manager - bs blockstore.Blockstore + multiStore *multistore.MultiStore node retrievalmarket.RetrievalClientNode storedCounter *storedcounter.StoredCounter @@ -64,7 +64,7 @@ var _ retrievalmarket.RetrievalClient = &Client{} // NewClient creates a new retrieval client func NewClient( network rmnet.RetrievalMarketNetwork, - bs blockstore.Blockstore, + multiStore *multistore.MultiStore, dataTransfer datatransfer.Manager, node retrievalmarket.RetrievalClientNode, resolver retrievalmarket.PeerResolver, @@ -73,7 +73,7 @@ func NewClient( ) (retrievalmarket.RetrievalClient, error) { c := &Client{ network: network, - bs: bs, + multiStore: multiStore, dataTransfer: dataTransfer, node: node, resolver: resolver, @@ -106,6 +106,10 @@ func NewClient( return nil, err } dataTransfer.SubscribeToEvents(dtutils.ClientDataTransferSubscriber(c.stateMachines)) + err = dataTransfer.RegisterTransportConfigurer(&retrievalmarket.DealProposal{}, dtutils.TransportConfigurer(network.ID(), &clientStoreGetter{c})) + if err != nil { + return nil, err + } return c, nil } @@ -177,14 +181,20 @@ From then on, the statemachine controls the deal flow in the client. Other compo Documentation of the client state machine can be found at https://godoc.org/github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates */ -func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, miner peer.ID, clientWallet address.Address, minerWallet address.Address) (retrievalmarket.DealID, error) { +func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, miner peer.ID, clientWallet address.Address, minerWallet address.Address, storeID *multistore.StoreID) (retrievalmarket.DealID, error) { var err error next, err := c.storedCounter.Next() if err != nil { return 0, err } + // make sure the store is loadable + if storeID != nil { + _, err = c.multiStore.Get(*storeID) + if err != nil { + return 0, err + } + } dealID := retrievalmarket.DealID(next) - dealState := retrievalmarket.ClientDealState{ DealProposal: retrievalmarket.DealProposal{ PayloadCID: payloadCID, @@ -202,6 +212,7 @@ func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie Status: retrievalmarket.DealStatusNew, Sender: miner, UnsealFundsPaid: big.Zero(), + StoreID: storeID, } // start the deal processing @@ -286,6 +297,22 @@ func (c *clientDealEnvironment) CloseDataTransfer(ctx context.Context, channelID return c.c.dataTransfer.CloseDataTransferChannel(ctx, channelID) } +type clientStoreGetter struct { + c *Client +} + +func (csg *clientStoreGetter) Get(otherPeer peer.ID, dealID retrievalmarket.DealID) (*multistore.Store, error) { + var deal retrievalmarket.ClientDealState + err := csg.c.stateMachines.Get(dealID).Get(&deal) + if err != nil { + return nil, err + } + if deal.StoreID == nil { + return nil, nil + } + return csg.c.multiStore.Get(*deal.StoreID) +} + // ClientFSMParameterSpec is a valid set of parameters for a client deal FSM - used in doc generation var ClientFSMParameterSpec = fsm.Parameters{ Environment: &clientDealEnvironment{}, diff --git a/retrievalmarket/impl/client_test.go b/retrievalmarket/impl/client_test.go index f3ef89fa..5440a09d 100644 --- a/retrievalmarket/impl/client_test.go +++ b/retrievalmarket/impl/client_test.go @@ -7,12 +7,12 @@ import ( "github.com/ipfs/go-datastore" dss "github.com/ipfs/go-datastore/sync" - bstore "github.com/ipfs/go-ipfs-blockstore" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-storedcounter" "github.com/filecoin-project/specs-actors/actors/abi" @@ -28,12 +28,13 @@ func TestClient_Construction(t *testing.T) { ds := dss.MutexWrap(datastore.NewMapDatastore()) storedCounter := storedcounter.New(ds, datastore.NewKey("nextDealID")) - bs := bstore.NewBlockstore(ds) + multiStore, err := multistore.NewMultiDstore(ds) + require.NoError(t, err) dt := tut.NewTestDataTransfer() net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}) - _, err := retrievalimpl.NewClient( + _, err = retrievalimpl.NewClient( net, - bs, + multiStore, dt, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &tut.TestPeerResolver{}, @@ -50,6 +51,9 @@ func TestClient_Construction(t *testing.T) { require.True(t, ok) _, ok = dt.RegisteredVoucherTypes[1].VoucherType.(*retrievalmarket.DealPayment) require.True(t, ok) + require.Len(t, dt.RegisteredTransportConfigurers, 1) + _, ok = dt.RegisteredTransportConfigurers[0].VoucherType.(*retrievalmarket.DealProposal) + require.True(t, ok) } func TestClient_Query(t *testing.T) { @@ -57,7 +61,8 @@ func TestClient_Query(t *testing.T) { ds := dss.MutexWrap(datastore.NewMapDatastore()) storedCounter := storedcounter.New(ds, datastore.NewKey("nextDealID")) - bs := bstore.NewBlockstore(ds) + multiStore, err := multistore.NewMultiDstore(ds) + require.NoError(t, err) dt := tut.NewTestDataTransfer() pcid := tut.GenerateCids(1)[0] @@ -92,7 +97,7 @@ func TestClient_Query(t *testing.T) { }) c, err := retrievalimpl.NewClient( net, - bs, + multiStore, dt, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &tut.TestPeerResolver{}, @@ -112,7 +117,7 @@ func TestClient_Query(t *testing.T) { }) c, err := retrievalimpl.NewClient( net, - bs, + multiStore, dt, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &tut.TestPeerResolver{}, @@ -139,7 +144,7 @@ func TestClient_Query(t *testing.T) { }) c, err := retrievalimpl.NewClient( net, - bs, + multiStore, dt, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &tut.TestPeerResolver{}, @@ -165,7 +170,7 @@ func TestClient_Query(t *testing.T) { }) c, err := retrievalimpl.NewClient( net, - bs, + multiStore, dt, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &tut.TestPeerResolver{}, @@ -182,7 +187,8 @@ func TestClient_Query(t *testing.T) { func TestClient_FindProviders(t *testing.T) { ds := dss.MutexWrap(datastore.NewMapDatastore()) storedCounter := storedcounter.New(ds, datastore.NewKey("nextDealID")) - bs := bstore.NewBlockstore(ds) + multiStore, err := multistore.NewMultiDstore(ds) + require.NoError(t, err) dt := tut.NewTestDataTransfer() expectedPeer := peer.ID("somevalue") @@ -200,7 +206,7 @@ func TestClient_FindProviders(t *testing.T) { peers := tut.RequireGenerateRetrievalPeers(t, 3) testResolver := tut.TestPeerResolver{Peers: peers} - c, err := retrievalimpl.NewClient(net, bs, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter) + c, err := retrievalimpl.NewClient(net, multiStore, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter) require.NoError(t, err) testCid := tut.GenerateCids(1)[0] @@ -209,7 +215,7 @@ func TestClient_FindProviders(t *testing.T) { t.Run("when there is an error, returns empty provider list", func(t *testing.T) { testResolver := tut.TestPeerResolver{Peers: []retrievalmarket.RetrievalPeer{}, ResolverError: errors.New("boom")} - c, err := retrievalimpl.NewClient(net, bs, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter) + c, err := retrievalimpl.NewClient(net, multiStore, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter) require.NoError(t, err) badCid := tut.GenerateCids(1)[0] @@ -218,7 +224,7 @@ func TestClient_FindProviders(t *testing.T) { t.Run("when there are no providers", func(t *testing.T) { testResolver := tut.TestPeerResolver{Peers: []retrievalmarket.RetrievalPeer{}} - c, err := retrievalimpl.NewClient(net, bs, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter) + c, err := retrievalimpl.NewClient(net, multiStore, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter) require.NoError(t, err) testCid := tut.GenerateCids(1)[0] diff --git a/retrievalmarket/impl/dtutils/dtutils.go b/retrievalmarket/impl/dtutils/dtutils.go index 3648451f..69f6df11 100644 --- a/retrievalmarket/impl/dtutils/dtutils.go +++ b/retrievalmarket/impl/dtutils/dtutils.go @@ -7,8 +7,11 @@ import ( "math" logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime" + peer "github.com/libp2p/go-libp2p-core/peer" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-statemachine/fsm" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" @@ -134,3 +137,41 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { } } } + +// StoreGetter retrieves the store for a given proposal cid +type StoreGetter interface { + Get(otherPeer peer.ID, dealID rm.DealID) (*multistore.Store, error) +} + +// StoreConfigurableTransport defines the methods needed to +// configure a data transfer transport use a unique store for a given request +type StoreConfigurableTransport interface { + UseStore(datatransfer.ChannelID, ipld.Loader, ipld.Storer) error +} + +// TransportConfigurer configurers the graphsync transport to use a custom blockstore per deal +func TransportConfigurer(thisPeer peer.ID, storeGetter StoreGetter) datatransfer.TransportConfigurer { + return func(channelID datatransfer.ChannelID, voucher datatransfer.Voucher, transport datatransfer.Transport) { + dealProposal, ok := voucher.(*rm.DealProposal) + if !ok { + return + } + gsTransport, ok := transport.(StoreConfigurableTransport) + if !ok { + return + } + otherPeer := channelID.OtherParty(thisPeer) + store, err := storeGetter.Get(otherPeer, dealProposal.ID) + if err != nil { + log.Errorf("attempting to configure data store: %w", err) + return + } + if store == nil { + return + } + err = gsTransport.UseStore(channelID, store.Loader, store.Storer) + if err != nil { + log.Errorf("attempting to configure data store: %w", err) + } + } +} diff --git a/retrievalmarket/impl/dtutils/dtutils_test.go b/retrievalmarket/impl/dtutils/dtutils_test.go index e1b94c45..bfd84864 100644 --- a/retrievalmarket/impl/dtutils/dtutils_test.go +++ b/retrievalmarket/impl/dtutils/dtutils_test.go @@ -1,13 +1,17 @@ package dtutils_test import ( + "context" "errors" "math/rand" "testing" + "github.com/ipld/go-ipld-prime" + peer "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/go-fil-markets/retrievalmarket" @@ -265,3 +269,125 @@ func (fdg *fakeDealGroup) Send(id interface{}, name fsm.EventName, args ...inter fdg.called = true return fdg.returnedErr } + +func TestTransportConfigurer(t *testing.T) { + payloadCID := shared_testutil.GenerateCids(1)[0] + expectedChannelID := shared_testutil.MakeTestChannelID() + expectedDealID := rm.DealID(rand.Uint64()) + thisPeer := expectedChannelID.Initiator + expectedPeer := expectedChannelID.Responder + + testCases := map[string]struct { + voucher datatransfer.Voucher + transport datatransfer.Transport + returnedStore *multistore.Store + returnedStoreErr error + getterCalled bool + useStoreCalled bool + }{ + "non-storage voucher": { + voucher: nil, + getterCalled: false, + }, + "non-configurable transport": { + voucher: &rm.DealProposal{ + PayloadCID: payloadCID, + ID: expectedDealID, + }, + transport: &fakeTransport{}, + getterCalled: false, + }, + "store getter errors": { + voucher: &rm.DealProposal{ + PayloadCID: payloadCID, + ID: expectedDealID, + }, + transport: &fakeGsTransport{Transport: &fakeTransport{}}, + getterCalled: true, + useStoreCalled: false, + returnedStore: nil, + returnedStoreErr: errors.New("something went wrong"), + }, + "store getter succeeds": { + voucher: &rm.DealProposal{ + PayloadCID: payloadCID, + ID: expectedDealID, + }, + transport: &fakeGsTransport{Transport: &fakeTransport{}}, + getterCalled: true, + useStoreCalled: true, + returnedStore: &multistore.Store{}, + returnedStoreErr: nil, + }, + } + for testCase, data := range testCases { + t.Run(testCase, func(t *testing.T) { + storeGetter := &fakeStoreGetter{returnedErr: data.returnedStoreErr, returnedStore: data.returnedStore} + transportConfigurer := dtutils.TransportConfigurer(thisPeer, storeGetter) + transportConfigurer(expectedChannelID, data.voucher, data.transport) + if data.getterCalled { + require.True(t, storeGetter.called) + require.Equal(t, expectedDealID, storeGetter.lastDealID) + require.Equal(t, expectedPeer, storeGetter.lastOtherPeer) + fgt, ok := data.transport.(*fakeGsTransport) + require.True(t, ok) + if data.useStoreCalled { + require.True(t, fgt.called) + require.Equal(t, expectedChannelID, fgt.lastChannelID) + } else { + require.False(t, fgt.called) + } + } else { + require.False(t, storeGetter.called) + } + }) + } +} + +type fakeStoreGetter struct { + lastDealID rm.DealID + lastOtherPeer peer.ID + returnedErr error + returnedStore *multistore.Store + called bool +} + +func (fsg *fakeStoreGetter) Get(otherPeer peer.ID, dealID rm.DealID) (*multistore.Store, error) { + fsg.lastDealID = dealID + fsg.lastOtherPeer = otherPeer + fsg.called = true + return fsg.returnedStore, fsg.returnedErr +} + +type fakeTransport struct{} + +func (ft *fakeTransport) OpenChannel(ctx context.Context, dataSender peer.ID, channelID datatransfer.ChannelID, root ipld.Link, stor ipld.Node, msg datatransfer.Message) error { + return nil +} + +func (ft *fakeTransport) CloseChannel(ctx context.Context, chid datatransfer.ChannelID) error { + return nil +} + +func (ft *fakeTransport) SetEventHandler(events datatransfer.EventsHandler) error { + return nil +} + +func (ft *fakeTransport) CleanupChannel(chid datatransfer.ChannelID) { +} + +type fakeGsTransport struct { + datatransfer.Transport + lastChannelID datatransfer.ChannelID + lastLoader ipld.Loader + lastStorer ipld.Storer + called bool +} + +func (fgt *fakeGsTransport) UseStore(channelID datatransfer.ChannelID, loader ipld.Loader, storer ipld.Storer) error { + fgt.lastChannelID = channelID + fgt.lastLoader = loader + fgt.lastStorer = storer + fgt.called = true + return nil +} diff --git a/retrievalmarket/impl/integration_test.go b/retrievalmarket/impl/integration_test.go index 4c7d31d6..bef7a994 100644 --- a/retrievalmarket/impl/integration_test.go +++ b/retrievalmarket/impl/integration_test.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/go-address" dtimpl "github.com/filecoin-project/go-data-transfer/impl" dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/paych" @@ -95,7 +96,7 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC require.NoError(t, err) err = dt1.Start(bgCtx) require.NoError(t, err) - client, err := retrievalimpl.NewClient(nw1, testData.Bs1, dt1, rcNode1, &tut.TestPeerResolver{}, testData.Ds1, testData.RetrievalStoredCounter1) + client, err := retrievalimpl.NewClient(nw1, testData.MultiStore1, dt1, rcNode1, &tut.TestPeerResolver{}, testData.Ds1, testData.RetrievalStoredCounter1) require.NoError(t, err) nw2 := rmnet.NewFromLibp2pHost(testData.Host2) providerNode := testnodes.NewTestRetrievalProviderNode() @@ -131,7 +132,7 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC require.NoError(t, err) err = dt2.Start(bgCtx) require.NoError(t, err) - provider, err := retrievalimpl.NewProvider(paymentAddress, providerNode, nw2, pieceStore, testData.Bs2, dt2, testData.Ds2) + provider, err := retrievalimpl.NewProvider(paymentAddress, providerNode, nw2, pieceStore, testData.MultiStore2, dt2, testData.Ds2) require.NoError(t, err) provider.SetPaymentInterval(expectedQR.MaxPaymentInterval, expectedQR.MaxPaymentIntervalIncrease) @@ -167,6 +168,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { selector ipld.Node unsealPrice abi.TokenAmount paramsV1, addFunds bool + skipStores bool }{ {name: "1 block file retrieval succeeds with existing payment channel", filename: "lorem_under_1_block.txt", @@ -213,6 +215,12 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { filesize: 410, voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(410000)}, }, + {name: "succeeds for regular blockstore", + filename: "lorem.txt", + filesize: 19000, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10136000), abi.NewTokenAmount(9784000)}, + skipStores: true, + }, } for i, testCase := range testCases { @@ -228,7 +236,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { fpath := filepath.Join("retrievalmarket", "impl", "fixtures", testCase.filename) - pieceLink := testData.LoadUnixFSFile(t, fpath, true) + pieceLink, storeID := testData.LoadUnixFSFileToStore(t, fpath, true) c, ok := pieceLink.(cidlink.Link) require.True(t, ok) payloadCID := c.Cid @@ -255,7 +263,9 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { var pieceInfo piecestore.PieceInfo cio := cario.NewCarIO() var buf bytes.Buffer - err = cio.WriteCar(bgCtx, testData.Bs2, payloadCID, shared.AllSelector(), &buf) + store, err := testData.MultiStore2.Get(storeID) + require.NoError(t, err) + err = cio.WriteCar(bgCtx, store.Bstore, payloadCID, shared.AllSelector(), &buf) require.NoError(t, err) carData := buf.Bytes() sectorID := uint64(100000) @@ -272,12 +282,8 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { } providerNode.ExpectUnseal(sectorID, offset, uint64(len(carData)), carData) // clearout provider blockstore - allCids, err := testData.Bs2.AllKeysChan(bgCtx) + err = testData.MultiStore2.Delete(storeID) require.NoError(t, err) - for c := range allCids { - err = testData.Bs2.DeleteBlock(c) - require.NoError(t, err) - } decider := rmtesting.TrivialTestDecider if testCase.decider != nil { @@ -302,7 +308,6 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { // ------- SET UP CLIENT nw1 := rmnet.NewFromLibp2pHost(testData.Host1) - createdChan, newLaneAddr, createdVoucher, client, err := setupClient(bgCtx, t, clientPaymentChannel, expectedVoucher, nw1, testData, testCase.addFunds) require.NoError(t, err) @@ -361,8 +366,13 @@ CurrentInterval: %d rmParams = retrievalmarket.NewParamsV0(pricePerByte, paymentInterval, paymentIntervalIncrease) } + var clientStoreID *multistore.StoreID + if !testCase.skipStores { + id := testData.MultiStore1.Next() + clientStoreID = &id + } // *** Retrieve the piece - did, err := client.Retrieve(bgCtx, payloadCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address) + did, err := client.Retrieve(bgCtx, payloadCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address, clientStoreID) assert.Equal(t, did, retrievalmarket.DealID(0)) require.NoError(t, err) @@ -404,7 +414,11 @@ CurrentInterval: %d } // verify that the provider saved the same voucher values providerNode.VerifyExpectations(t) - testData.VerifyFileTransferred(t, pieceLink, false, testCase.filesize) + if testCase.skipStores { + testData.VerifyFileTransferred(t, pieceLink, false, testCase.filesize) + } else { + testData.VerifyFileTransferredIntoStore(t, pieceLink, *clientStoreID, false, testCase.filesize) + } }) } @@ -456,7 +470,7 @@ func setupClient( err = dt1.Start(ctx) require.NoError(t, err) - client, err := retrievalimpl.NewClient(nw1, testData.Bs1, dt1, clientNode, &tut.TestPeerResolver{}, testData.Ds1, testData.RetrievalStoredCounter1) + client, err := retrievalimpl.NewClient(nw1, testData.MultiStore1, dt1, clientNode, &tut.TestPeerResolver{}, testData.Ds1, testData.RetrievalStoredCounter1) return &createdChan, &newLaneAddr, &createdVoucher, client, err } @@ -490,7 +504,7 @@ func setupProvider( require.NoError(t, err) provider, err := retrievalimpl.NewProvider(providerPaymentAddr, providerNode, nw2, - pieceStore, testData.Bs2, dt2, testData.Ds2, + pieceStore, testData.MultiStore2, dt2, testData.Ds2, retrievalimpl.DealDeciderOpt(decider)) require.NoError(t, err) provider.SetPaymentInterval(expectedQR.MaxPaymentInterval, expectedQR.MaxPaymentIntervalIncrease) diff --git a/retrievalmarket/impl/provider.go b/retrievalmarket/impl/provider.go index d009860f..b6b1ab9f 100644 --- a/retrievalmarket/impl/provider.go +++ b/retrievalmarket/impl/provider.go @@ -7,11 +7,11 @@ import ( "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - blockstore "github.com/ipfs/go-ipfs-blockstore" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" @@ -31,7 +31,7 @@ type DealDecider func(ctx context.Context, state retrievalmarket.ProviderDealSta // Provider is the production implementation of the RetrievalProvider interface type Provider struct { - bs blockstore.Blockstore + multiStore *multistore.MultiStore dataTransfer datatransfer.Manager node retrievalmarket.RetrievalProviderNode network rmnet.RetrievalMarketNetwork @@ -92,14 +92,14 @@ func NewProvider(minerAddress address.Address, node retrievalmarket.RetrievalProviderNode, network rmnet.RetrievalMarketNetwork, pieceStore piecestore.PieceStore, - bs blockstore.Blockstore, + multiStore *multistore.MultiStore, dataTransfer datatransfer.Manager, ds datastore.Batching, opts ...RetrievalProviderOption, ) (retrievalmarket.RetrievalProvider, error) { p := &Provider{ - bs: bs, + multiStore: multiStore, dataTransfer: dataTransfer, node: node, network: network, @@ -138,6 +138,11 @@ func NewProvider(minerAddress address.Address, return nil, err } dataTransfer.SubscribeToEvents(dtutils.ProviderDataTransferSubscriber(p.stateMachines)) + err = p.dataTransfer.RegisterTransportConfigurer(&retrievalmarket.DealProposal{}, + dtutils.TransportConfigurer(network.ID(), &providerStoreGetter{p})) + if err != nil { + return nil, err + } return p, nil } diff --git a/retrievalmarket/impl/provider_environments.go b/retrievalmarket/impl/provider_environments.go index f796bd8d..8d1d3681 100644 --- a/retrievalmarket/impl/provider_environments.go +++ b/retrievalmarket/impl/provider_environments.go @@ -6,15 +6,18 @@ import ( "io" "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/go-fil-markets/pieceio/cario" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/dtutils" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/requestvalidation" ) @@ -72,6 +75,13 @@ func (pve *providerValidationEnvironment) BeginTracking(pds retrievalmarket.Prov return pve.p.stateMachines.Send(pds.Identifier(), retrievalmarket.ProviderEventOpen) } +// NextStoreID allocates a store for this deal +func (pve *providerValidationEnvironment) NextStoreID() (multistore.StoreID, error) { + storeID := pve.p.multiStore.Next() + _, err := pve.p.multiStore.Get(storeID) + return storeID, err +} + type providerRevalidatorEnvironment struct { p *Provider } @@ -101,8 +111,12 @@ func (pde *providerDealEnvironment) Node() retrievalmarket.RetrievalProviderNode return pde.p.node } -func (pde *providerDealEnvironment) ReadIntoBlockstore(pieceData io.Reader) error { - _, err := cario.NewCarIO().LoadCar(pde.p.bs, pieceData) +func (pde *providerDealEnvironment) ReadIntoBlockstore(storeID multistore.StoreID, pieceData io.Reader) error { + store, err := pde.p.multiStore.Get(storeID) + if err != nil { + return err + } + _, err = cario.NewCarIO().LoadCar(store.Bstore, pieceData) return err } @@ -124,6 +138,9 @@ func (pde *providerDealEnvironment) CloseDataTransfer(ctx context.Context, chid return pde.p.dataTransfer.CloseDataTransferChannel(ctx, chid) } +func (pde *providerDealEnvironment) DeleteStore(storeID multistore.StoreID) error { + return pde.p.multiStore.Delete(storeID) +} func getPieceInfoFromCid(pieceStore piecestore.PieceStore, payloadCID, pieceCID cid.Cid) (piecestore.PieceInfo, error) { cidInfo, err := pieceStore.GetCIDInfo(payloadCID) if err != nil { @@ -144,3 +161,18 @@ func getPieceInfoFromCid(pieceStore piecestore.PieceStore, payloadCID, pieceCID } return piecestore.PieceInfoUndefined, xerrors.Errorf("could not locate piece: %w", lastErr) } + +var _ dtutils.StoreGetter = &providerStoreGetter{} + +type providerStoreGetter struct { + p *Provider +} + +func (psg *providerStoreGetter) Get(otherPeer peer.ID, dealID retrievalmarket.DealID) (*multistore.Store, error) { + var deal retrievalmarket.ProviderDealState + err := psg.p.stateMachines.GetSync(context.TODO(), retrievalmarket.ProviderDealIdentifier{Receiver: otherPeer, DealID: dealID}, &deal) + if err != nil { + return nil, err + } + return psg.p.multiStore.Get(deal.StoreID) +} diff --git a/retrievalmarket/impl/provider_test.go b/retrievalmarket/impl/provider_test.go index 83d8ed93..ec20492c 100644 --- a/retrievalmarket/impl/provider_test.go +++ b/retrievalmarket/impl/provider_test.go @@ -7,12 +7,12 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" dss "github.com/ipfs/go-datastore/sync" - bstore "github.com/ipfs/go-ipfs-blockstore" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/specs-actors/actors/abi" spect "github.com/filecoin-project/specs-actors/support/testing" @@ -64,13 +64,14 @@ func TestHandleQueryStream(t *testing.T) { return qs } - receiveStreamOnProvider := func(qs network.RetrievalQueryStream, pieceStore piecestore.PieceStore) { + receiveStreamOnProvider := func(t *testing.T, qs network.RetrievalQueryStream, pieceStore piecestore.PieceStore) { node := testnodes.NewTestRetrievalProviderNode() ds := dss.MutexWrap(datastore.NewMapDatastore()) - bs := bstore.NewBlockstore(ds) + multiStore, err := multistore.NewMultiDstore(ds) + require.NoError(t, err) dt := tut.NewTestDataTransfer() net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}) - c, err := retrievalimpl.NewProvider(expectedAddress, node, net, pieceStore, bs, dt, ds) + c, err := retrievalimpl.NewProvider(expectedAddress, node, net, pieceStore, multiStore, dt, ds) require.NoError(t, err) c.SetPricePerByte(expectedPricePerByte) c.SetPaymentInterval(expectedPaymentInterval, expectedPaymentIntervalIncrease) @@ -151,7 +152,7 @@ func TestHandleQueryStream(t *testing.T) { tc.expFunc(t, pieceStore) - receiveStreamOnProvider(qs, pieceStore) + receiveStreamOnProvider(t, qs, pieceStore) actualResp, err := qs.ReadQueryResponse() pieceStore.VerifyExpectations(t) @@ -177,7 +178,7 @@ func TestHandleQueryStream(t *testing.T) { require.NoError(t, err) pieceStore := tut.NewTestPieceStore() - receiveStreamOnProvider(qs, pieceStore) + receiveStreamOnProvider(t, qs, pieceStore) response, err := qs.ReadQueryResponse() require.NoError(t, err) @@ -189,7 +190,7 @@ func TestHandleQueryStream(t *testing.T) { qs := readWriteQueryStream() pieceStore := tut.NewTestPieceStore() - receiveStreamOnProvider(qs, pieceStore) + receiveStreamOnProvider(t, qs, pieceStore) response, err := qs.ReadQueryResponse() require.NotNil(t, err) @@ -212,7 +213,7 @@ func TestHandleQueryStream(t *testing.T) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID, expectedPiece) - receiveStreamOnProvider(qs, pieceStore) + receiveStreamOnProvider(t, qs, pieceStore) pieceStore.VerifyExpectations(t) }) @@ -221,14 +222,15 @@ func TestHandleQueryStream(t *testing.T) { func TestProvider_Construct(t *testing.T) { ds := datastore.NewMapDatastore() - bs := bstore.NewBlockstore(ds) + multiStore, err := multistore.NewMultiDstore(ds) + require.NoError(t, err) dt := tut.NewTestDataTransfer() - _, err := retrievalimpl.NewProvider( + _, err = retrievalimpl.NewProvider( spect.NewIDAddr(t, 2344), testnodes.NewTestRetrievalProviderNode(), tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}), tut.NewTestPieceStore(), - bs, + multiStore, dt, ds, ) @@ -247,20 +249,23 @@ func TestProvider_Construct(t *testing.T) { require.True(t, ok) _, ok = dt.RegisteredRevalidators[0].Revalidator.(*requestvalidation.ProviderRevalidator) require.True(t, ok) - + require.Len(t, dt.RegisteredTransportConfigurers, 1) + _, ok = dt.RegisteredTransportConfigurers[0].VoucherType.(*retrievalmarket.DealProposal) + require.True(t, ok) } func TestProviderConfigOpts(t *testing.T) { var sawOpt int opt1 := func(p *retrievalimpl.Provider) { sawOpt++ } opt2 := func(p *retrievalimpl.Provider) { sawOpt += 2 } ds := datastore.NewMapDatastore() - bs := bstore.NewBlockstore(ds) + multiStore, err := multistore.NewMultiDstore(ds) + require.NoError(t, err) p, err := retrievalimpl.NewProvider( spect.NewIDAddr(t, 2344), testnodes.NewTestRetrievalProviderNode(), tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}), tut.NewTestPieceStore(), - bs, + multiStore, tut.NewTestDataTransfer(), ds, opt1, opt2, ) @@ -280,7 +285,7 @@ func TestProviderConfigOpts(t *testing.T) { testnodes.NewTestRetrievalProviderNode(), tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}), tut.NewTestPieceStore(), - bs, + multiStore, tut.NewTestDataTransfer(), ds, ddOpt) require.NoError(t, err) diff --git a/retrievalmarket/impl/providerstates/provider_fsm.go b/retrievalmarket/impl/providerstates/provider_fsm.go index b4fd5b74..445de68a 100644 --- a/retrievalmarket/impl/providerstates/provider_fsm.go +++ b/retrievalmarket/impl/providerstates/provider_fsm.go @@ -96,6 +96,11 @@ var ProviderEvents = fsm.Events{ fsm.Event(rm.ProviderEventDataTransferError). FromAny().To(rm.DealStatusErrored). Action(recordError), + + // multistore errors + fsm.Event(rm.ProviderEventMultiStoreError). + FromAny().To(rm.DealStatusErrored). + Action(recordError), } // ProviderStateEntryFuncs are the handlers for different states in a retrieval provider diff --git a/retrievalmarket/impl/providerstates/provider_states.go b/retrievalmarket/impl/providerstates/provider_states.go index 499294d8..7146a2d8 100644 --- a/retrievalmarket/impl/providerstates/provider_states.go +++ b/retrievalmarket/impl/providerstates/provider_states.go @@ -7,6 +7,7 @@ import ( "golang.org/x/xerrors" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/go-fil-markets/piecestore" @@ -18,9 +19,10 @@ import ( type ProviderDealEnvironment interface { // Node returns the node interface for this deal Node() rm.RetrievalProviderNode - ReadIntoBlockstore(pieceData io.Reader) error + ReadIntoBlockstore(storeID multistore.StoreID, pieceData io.Reader) error TrackTransfer(deal rm.ProviderDealState) error UntrackTransfer(deal rm.ProviderDealState) error + DeleteStore(storeID multistore.StoreID) error ResumeDataTransfer(context.Context, datatransfer.ChannelID) error CloseDataTransfer(context.Context, datatransfer.ChannelID) error } @@ -43,7 +45,7 @@ func UnsealData(ctx fsm.Context, environment ProviderDealEnvironment, deal rm.Pr if err != nil { return ctx.Trigger(rm.ProviderEventUnsealError, err) } - err = environment.ReadIntoBlockstore(reader) + err = environment.ReadIntoBlockstore(deal.StoreID, reader) if err != nil { return ctx.Trigger(rm.ProviderEventUnsealError, err) } @@ -79,11 +81,14 @@ func CancelDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal rm.Pr if err != nil { return ctx.Trigger(rm.ProviderEventDataTransferError, err) } + err = environment.DeleteStore(deal.StoreID) + if err != nil { + return ctx.Trigger(rm.ProviderEventMultiStoreError, err) + } err = environment.CloseDataTransfer(ctx.Context(), deal.ChannelID) if err != nil { return ctx.Trigger(rm.ProviderEventDataTransferError, err) } - return ctx.Trigger(rm.ProviderEventCancelComplete) } @@ -93,5 +98,9 @@ func CleanupDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal rm.P if err != nil { return ctx.Trigger(rm.ProviderEventDataTransferError, err) } + err = environment.DeleteStore(deal.StoreID) + if err != nil { + return ctx.Trigger(rm.ProviderEventMultiStoreError, err) + } return ctx.Trigger(rm.ProviderEventCleanupComplete) } diff --git a/retrievalmarket/impl/providerstates/provider_states_test.go b/retrievalmarket/impl/providerstates/provider_states_test.go index 431887fb..b4281e0d 100644 --- a/retrievalmarket/impl/providerstates/provider_states_test.go +++ b/retrievalmarket/impl/providerstates/provider_states_test.go @@ -178,6 +178,15 @@ func TestCancelDeal(t *testing.T) { require.Equal(t, dealState.Status, rm.DealStatusErrored) require.Equal(t, dealState.Message, "something went wrong untracking") }) + t.Run("error deleting store", func(t *testing.T) { + dealState := makeDealState(rm.DealStatusFailing) + setupEnv := func(fe *rmtesting.TestProviderDealEnvironment) { + fe.DeleteStoreError = errors.New("something went wrong deleting store") + } + runCancelDeal(t, setupEnv, dealState) + require.Equal(t, dealState.Status, rm.DealStatusErrored) + require.Equal(t, dealState.Message, "something went wrong deleting store") + }) t.Run("error closing channel", func(t *testing.T) { dealState := makeDealState(rm.DealStatusFailing) setupEnv := func(fe *rmtesting.TestProviderDealEnvironment) { @@ -221,6 +230,16 @@ func TestCleanupDeal(t *testing.T) { require.Equal(t, dealState.Status, rm.DealStatusErrored) require.Equal(t, dealState.Message, "something went wrong untracking") }) + t.Run("error deleting store", func(t *testing.T) { + dealState := makeDealState(rm.DealStatusCompleting) + setupEnv := func(fe *rmtesting.TestProviderDealEnvironment) { + fe.DeleteStoreError = errors.New("something went wrong deleting store") + } + runCleanupDeal(t, setupEnv, dealState) + require.Equal(t, dealState.Status, rm.DealStatusErrored) + require.Equal(t, dealState.Message, "something went wrong deleting store") + }) + } var dealID = rm.DealID(10) diff --git a/retrievalmarket/impl/requestvalidation/requestvalidation.go b/retrievalmarket/impl/requestvalidation/requestvalidation.go index be9c4f5e..885901e6 100644 --- a/retrievalmarket/impl/requestvalidation/requestvalidation.go +++ b/retrievalmarket/impl/requestvalidation/requestvalidation.go @@ -11,6 +11,7 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" @@ -36,6 +37,8 @@ type ValidationEnvironment interface { RunDealDecisioningLogic(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) // StateMachines returns the FSM Group to begin tracking with BeginTracking(pds retrievalmarket.ProviderDealState) error + // NextStoreID allocates a store for this deal + NextStoreID() (multistore.StoreID, error) } // ProviderRequestValidator validates incoming requests for the Retrieval Provider @@ -133,6 +136,11 @@ func (rv *ProviderRequestValidator) acceptDeal(deal *retrievalmarket.ProviderDea return retrievalmarket.DealStatusRejected, errors.New(reason) } + deal.StoreID, err = rv.env.NextStoreID() + if err != nil { + return retrievalmarket.DealStatusErrored, err + } + if deal.UnsealPrice.GreaterThan(big.Zero()) { return retrievalmarket.DealStatusFundsNeededUnseal, nil } diff --git a/retrievalmarket/impl/requestvalidation/requestvalidation_test.go b/retrievalmarket/impl/requestvalidation/requestvalidation_test.go index ae6cea70..b6619ae2 100644 --- a/retrievalmarket/impl/requestvalidation/requestvalidation_test.go +++ b/retrievalmarket/impl/requestvalidation/requestvalidation_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/go-fil-markets/piecestore" @@ -127,6 +128,21 @@ func TestValidatePull(t *testing.T) { Message: "something went wrong", }, }, + "store ID error": { + fve: fakeValidationEnvironment{ + RunDealDecisioningLogicAccepted: true, + NextStoreIDError: errors.New("something went wrong"), + }, + baseCid: proposal.PayloadCID, + selector: shared.AllSelector(), + voucher: &proposal, + expectedError: errors.New("something went wrong"), + expectedVoucherResult: &retrievalmarket.DealResponse{ + Status: retrievalmarket.DealStatusErrored, + ID: proposal.ID, + Message: "something went wrong", + }, + }, "begin tracking error": { fve: fakeValidationEnvironment{ BeginTrackingError: errors.New("everything is awful"), @@ -174,6 +190,8 @@ type fakeValidationEnvironment struct { RunDealDecisioningLogicFailReason string RunDealDecisioningLogicError error BeginTrackingError error + NextStoreIDValue multistore.StoreID + NextStoreIDError error } func (fve *fakeValidationEnvironment) GetPiece(c cid.Cid, pieceCID *cid.Cid) (piecestore.PieceInfo, error) { @@ -194,3 +212,7 @@ func (fve *fakeValidationEnvironment) RunDealDecisioningLogic(ctx context.Contex func (fve *fakeValidationEnvironment) BeginTracking(pds retrievalmarket.ProviderDealState) error { return fve.BeginTrackingError } + +func (fve *fakeValidationEnvironment) NextStoreID() (multistore.StoreID, error) { + return fve.NextStoreIDValue, fve.NextStoreIDError +} diff --git a/retrievalmarket/network/libp2p_impl.go b/retrievalmarket/network/libp2p_impl.go index dbdb1504..b5885b07 100644 --- a/retrievalmarket/network/libp2p_impl.go +++ b/retrievalmarket/network/libp2p_impl.go @@ -67,3 +67,7 @@ func (impl *libp2pRetrievalMarketNetwork) handleNewQueryStream(s network.Stream) qs := &queryStream{remotePID, s, buffered} impl.receiver.HandleQueryStream(qs) } + +func (impl *libp2pRetrievalMarketNetwork) ID() peer.ID { + return impl.host.ID() +} diff --git a/retrievalmarket/network/network.go b/retrievalmarket/network/network.go index 8ca39f00..e0bec417 100644 --- a/retrievalmarket/network/network.go +++ b/retrievalmarket/network/network.go @@ -39,4 +39,7 @@ type RetrievalMarketNetwork interface { // StopHandlingRequests unsets the RetrievalReceiver and would perform any other necessary // shutdown logic. StopHandlingRequests() error + + // ID returns the peer id of the host for this network + ID() peer.ID } diff --git a/retrievalmarket/storage_retrieval_integration_test.go b/retrievalmarket/storage_retrieval_integration_test.go index fbdbd224..cc6c0289 100644 --- a/retrievalmarket/storage_retrieval_integration_test.go +++ b/retrievalmarket/storage_retrieval_integration_test.go @@ -163,7 +163,8 @@ func TestStorageRetrieval(t *testing.T) { // *** Retrieve the piece - did, err := rh.Client.Retrieve(bgCtx, sh.PayloadCid, rmParams, expectedTotal, retrievalPeer.ID, *rh.ExpPaych, retrievalPeer.Address) + clientStoreID := sh.TestData.MultiStore1.Next() + did, err := rh.Client.Retrieve(bgCtx, sh.PayloadCid, rmParams, expectedTotal, retrievalPeer.ID, *rh.ExpPaych, retrievalPeer.Address, &clientStoreID) assert.Equal(t, did, retrievalmarket.DealID(0)) require.NoError(t, err) @@ -192,7 +193,7 @@ func TestStorageRetrieval(t *testing.T) { require.Equal(t, retrievalmarket.DealStatusCompleted, providerDealState.Status) require.Equal(t, retrievalmarket.DealStatusCompleted, clientDealState.Status) - sh.TestData.VerifyFileTransferred(t, sh.PieceLink, false, uint64(fsize)) + sh.TestData.VerifyFileTransferredIntoStore(t, sh.PieceLink, clientStoreID, false, uint64(fsize)) } @@ -399,7 +400,7 @@ func newRetrievalHarness(ctx context.Context, t *testing.T, sh *storageHarness, }) nw1 := rmnet.NewFromLibp2pHost(sh.TestData.Host1) - client, err := retrievalimpl.NewClient(nw1, sh.TestData.Bs1, sh.DTClient, clientNode, sh.PeerResolver, sh.TestData.Ds1, sh.TestData.RetrievalStoredCounter1) + client, err := retrievalimpl.NewClient(nw1, sh.TestData.MultiStore1, sh.DTClient, clientNode, sh.PeerResolver, sh.TestData.Ds1, sh.TestData.RetrievalStoredCounter1) require.NoError(t, err) payloadCID := deal.DataRef.Root @@ -440,7 +441,7 @@ func newRetrievalHarness(ctx context.Context, t *testing.T, sh *storageHarness, } pieceStore.ExpectCID(payloadCID, cidInfo) pieceStore.ExpectPiece(expectedPiece, pieceInfo) - provider, err := retrievalimpl.NewProvider(providerPaymentAddr, providerNode, nw2, pieceStore, sh.TestData.Bs2, sh.DTProvider, sh.TestData.Ds2) + provider, err := retrievalimpl.NewProvider(providerPaymentAddr, providerNode, nw2, pieceStore, sh.TestData.MultiStore2, sh.DTProvider, sh.TestData.Ds2) require.NoError(t, err) params := retrievalmarket.Params{ diff --git a/retrievalmarket/testing/test_provider_deal_environment.go b/retrievalmarket/testing/test_provider_deal_environment.go index 57ac758e..0aad2e24 100644 --- a/retrievalmarket/testing/test_provider_deal_environment.go +++ b/retrievalmarket/testing/test_provider_deal_environment.go @@ -6,6 +6,7 @@ import ( "io" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-multistore" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl" @@ -20,6 +21,7 @@ type TestProviderDealEnvironment struct { TrackTransferError error UntrackTransferError error CloseDataTransferError error + DeleteStoreError error } // NewTestProviderDealEnvironment returns a new TestProviderDealEnvironment instance @@ -34,7 +36,11 @@ func (te *TestProviderDealEnvironment) Node() rm.RetrievalProviderNode { return te.node } -func (te *TestProviderDealEnvironment) ReadIntoBlockstore(pieceData io.Reader) error { +func (te *TestProviderDealEnvironment) DeleteStore(storeID multistore.StoreID) error { + return te.DeleteStoreError +} + +func (te *TestProviderDealEnvironment) ReadIntoBlockstore(storeID multistore.StoreID, pieceData io.Reader) error { return te.ReadIntoBlockstoreError } diff --git a/retrievalmarket/types.go b/retrievalmarket/types.go index a49031ad..beea750b 100644 --- a/retrievalmarket/types.go +++ b/retrievalmarket/types.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/paych" @@ -44,6 +45,7 @@ type PaymentInfo struct { // of a retrieval client type ClientDealState struct { DealProposal + StoreID *multistore.StoreID ChannelID datatransfer.ChannelID LastPaymentRequested bool AllBlocksReceived bool @@ -67,6 +69,7 @@ type ClientDealState struct { // of a retrieval provider type ProviderDealState struct { DealProposal + StoreID multistore.StoreID ChannelID datatransfer.ChannelID PieceInfo *piecestore.PieceInfo Status DealStatus diff --git a/retrievalmarket/types_cbor_gen.go b/retrievalmarket/types_cbor_gen.go index f3e49f9b..334a354c 100644 --- a/retrievalmarket/types_cbor_gen.go +++ b/retrievalmarket/types_cbor_gen.go @@ -7,6 +7,7 @@ import ( "io" "github.com/filecoin-project/go-fil-markets/piecestore" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/specs-actors/actors/builtin/paych" "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" @@ -807,7 +808,7 @@ func (t *DealPayment) UnmarshalCBOR(r io.Reader) error { return nil } -var lengthBufClientDealState = []byte{146} +var lengthBufClientDealState = []byte{147} func (t *ClientDealState) MarshalCBOR(w io.Writer) error { if t == nil { @@ -825,6 +826,18 @@ func (t *ClientDealState) MarshalCBOR(w io.Writer) error { return err } + // t.StoreID (multistore.StoreID) (uint64) + + if t.StoreID == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(*t.StoreID)); err != nil { + return err + } + } + // t.ChannelID (datatransfer.ChannelID) (struct) if err := t.ChannelID.MarshalCBOR(w); err != nil { return err @@ -952,7 +965,7 @@ func (t *ClientDealState) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 18 { + if extra != 19 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -964,6 +977,32 @@ func (t *ClientDealState) UnmarshalCBOR(r io.Reader) error { return xerrors.Errorf("unmarshaling t.DealProposal: %w", err) } + } + // t.StoreID (multistore.StoreID) (uint64) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + typed := multistore.StoreID(extra) + t.StoreID = &typed + } + } // t.ChannelID (datatransfer.ChannelID) (struct) @@ -1186,7 +1225,7 @@ func (t *ClientDealState) UnmarshalCBOR(r io.Reader) error { return nil } -var lengthBufProviderDealState = []byte{137} +var lengthBufProviderDealState = []byte{138} func (t *ProviderDealState) MarshalCBOR(w io.Writer) error { if t == nil { @@ -1204,6 +1243,12 @@ func (t *ProviderDealState) MarshalCBOR(w io.Writer) error { return err } + // t.StoreID (multistore.StoreID) (uint64) + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.StoreID)); err != nil { + return err + } + // t.ChannelID (datatransfer.ChannelID) (struct) if err := t.ChannelID.MarshalCBOR(w); err != nil { return err @@ -1278,7 +1323,7 @@ func (t *ProviderDealState) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 9 { + if extra != 10 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -1290,6 +1335,20 @@ func (t *ProviderDealState) UnmarshalCBOR(r io.Reader) error { return xerrors.Errorf("unmarshaling t.DealProposal: %w", err) } + } + // t.StoreID (multistore.StoreID) (uint64) + + { + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.StoreID = multistore.StoreID(extra) + } // t.ChannelID (datatransfer.ChannelID) (struct) diff --git a/shared_testutil/mocknet.go b/shared_testutil/mocknet.go index 869327fd..e090cf27 100644 --- a/shared_testutil/mocknet.go +++ b/shared_testutil/mocknet.go @@ -247,6 +247,26 @@ func (ltd *Libp2pTestData) VerifyFileTransferred(t *testing.T, link ipld.Link, u } else { dagService = ltd.DagService1 } + ltd.verifyFileTransferred(t, link, dagService, readLen) +} + +// VerifyFileTransferredIntoStore checks that the fixture file was sent from one node to the other, into the store specified by +// storeID +func (ltd *Libp2pTestData) VerifyFileTransferredIntoStore(t *testing.T, link ipld.Link, storeID multistore.StoreID, useSecondNode bool, readLen uint64) { + var dagService ipldformat.DAGService + if useSecondNode { + store, err := ltd.MultiStore2.Get(storeID) + require.NoError(t, err) + dagService = store.DAG + } else { + store, err := ltd.MultiStore1.Get(storeID) + require.NoError(t, err) + dagService = store.DAG + } + ltd.verifyFileTransferred(t, link, dagService, readLen) +} + +func (ltd *Libp2pTestData) verifyFileTransferred(t *testing.T, link ipld.Link, dagService ipldformat.DAGService, readLen uint64) { c := link.(cidlink.Link).Cid diff --git a/shared_testutil/test_network_types.go b/shared_testutil/test_network_types.go index 0d1a6cef..92159a83 100644 --- a/shared_testutil/test_network_types.go +++ b/shared_testutil/test_network_types.go @@ -189,6 +189,11 @@ func (trmn *TestRetrievalMarketNetwork) StopHandlingRequests() error { return nil } +// ID returns the peer id of this host (empty peer ID in test) +func (trmn *TestRetrievalMarketNetwork) ID() peer.ID { + return peer.ID("") +} + var _ rmnet.RetrievalMarketNetwork = &TestRetrievalMarketNetwork{} // Some convenience builders