diff --git a/docs/retrievalclient.mmd b/docs/retrievalclient.mmd index 31c3f7ce..387716a8 100644 --- a/docs/retrievalclient.mmd +++ b/docs/retrievalclient.mmd @@ -76,6 +76,9 @@ stateDiagram-v2 19 --> 15 : ClientEventComplete 8 --> 17 : + note left of 3 : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived + + note left of 4 : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived @@ -84,6 +87,3 @@ stateDiagram-v2 note left of 6 : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived - - note left of 3 : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived - diff --git a/docs/retrievalclient.mmd.svg b/docs/retrievalclient.mmd.svg index 6a7a3273..6ea06262 100644 --- a/docs/retrievalclient.mmd.svg +++ b/docs/retrievalclient.mmd.svg @@ -1,6 +1,6 @@ -ClientEventOpenClientEventDealProposedClientEventDealRejectedClientEventDealNotFoundClientEventDealAcceptedClientEventPaymentChannelErroredClientEventPaymentChannelErroredClientEventPaymentChannelCreateInitiatedClientEventPaymentChannelAddingFundsClientEventPaymentChannelReadyClientEventPaymentChannelReadyClientEventAllocateLaneErroredClientEventAllocateLaneErroredClientEventPaymentChannelAddFundsErroredClientEventLastPaymentRequestedClientEventLastPaymentRequestedClientEventLastPaymentRequestedClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventPaymentRequestedClientEventPaymentRequestedClientEventUnsealPaymentRequestedClientEventAllBlocksReceivedClientEventAllBlocksReceivedClientEventAllBlocksReceivedClientEventBlocksReceivedClientEventBlocksReceivedClientEventBlocksReceivedClientEventSendFundsClientEventSendFundsClientEventFundsExpendedClientEventFundsExpendedClientEventBadPaymentRequestedClientEventBadPaymentRequestedClientEventCreateVoucherFailedClientEventCreateVoucherFailedClientEventPaymentSentClientEventPaymentSentClientEventComplete<invalid Value>DealStatusNewOn entry runs ProposeDealDealStatusWaitForAcceptanceDealStatusPaymentChannelCreatingOn entry runs WaitForPaymentChannelCreateDealStatusPaymentChannelAddingFundsOn entry runs WaitForPaymentChannelAddFundsDealStatusAcceptedOn entry runs SetupPaymentChannelStartDealStatusFailedOn entry runs CancelDealDealStatusRejectedDealStatusFundsNeededOn entry runs ProcessPaymentRequestedDealStatusSendFundsOn entry runs SendFundsDealStatusSendFundsLastPaymentOn entry runs SendFundsDealStatusOngoingOn entry runs OngoingDealStatusFundsNeededLastPaymentOn entry runs ProcessPaymentRequestedDealStatusCompletedDealStatusDealNotFoundDealStatusErroredDealStatusBlocksCompleteDealStatusFinalizingThe following events are not shown cause they can trigger from any state.ClientEventWriteDealProposalErrored - transitions state to DealStatusErroredClientEventUnknownResponseReceived - transitions state to DealStatusFailedClientEventDataTransferError - transitions state to DealStatusErroredClientEventWriteDealPaymentErrored - transitions state to DealStatusErroredThe following events only record in this state.ClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventAllBlocksReceivedClientEventBlocksReceivedThe following events only record in this state.ClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventAllBlocksReceivedClientEventBlocksReceivedThe following events only record in this state.ClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventAllBlocksReceivedClientEventBlocksReceivedThe following events only record in this state.ClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventAllBlocksReceivedClientEventBlocksReceived \ No newline at end of file diff --git a/retrievalmarket/client.go b/retrievalmarket/client.go index 0a4bf9aa..41bb7c41 100644 --- a/retrievalmarket/client.go +++ b/retrievalmarket/client.go @@ -4,7 +4,6 @@ import ( "context" "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-multistore" @@ -35,7 +34,7 @@ type RetrievalClient interface { payloadCID cid.Cid, params Params, totalFunds abi.TokenAmount, - miner peer.ID, + p RetrievalPeer, clientWallet address.Address, minerWallet address.Address, storeID *multistore.StoreID, diff --git a/retrievalmarket/impl/client.go b/retrievalmarket/impl/client.go index a47e3d51..239d2d97 100644 --- a/retrievalmarket/impl/client.go +++ b/retrievalmarket/impl/client.go @@ -133,7 +133,12 @@ the request are. The client a new `RetrievalQueryStream` for the chosen peer ID, and calls WriteQuery on it, which constructs a data-transfer message and writes it to the Query stream. */ -func (c *Client) Query(_ context.Context, p retrievalmarket.RetrievalPeer, payloadCID cid.Cid, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) { +func (c *Client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, payloadCID cid.Cid, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) { + err := c.addMultiaddrs(ctx, p) + if err != nil { + log.Warn(err) + return retrievalmarket.QueryResponseUndefined, err + } s, err := c.network.NewQueryStream(p.ID) if err != nil { log.Warn(err) @@ -181,8 +186,11 @@ From then on, the statemachine controls the deal flow in the client. Other compo Documentation of the client state machine can be found at https://godoc.org/github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates */ -func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, miner peer.ID, clientWallet address.Address, minerWallet address.Address, storeID *multistore.StoreID) (retrievalmarket.DealID, error) { - var err error +func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, p retrievalmarket.RetrievalPeer, clientWallet address.Address, minerWallet address.Address, storeID *multistore.StoreID) (retrievalmarket.DealID, error) { + err := c.addMultiaddrs(ctx, p) + if err != nil { + return 0, err + } next, err := c.storedCounter.Next() if err != nil { return 0, err @@ -210,7 +218,7 @@ func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie PaymentRequested: abi.NewTokenAmount(0), FundsSpent: abi.NewTokenAmount(0), Status: retrievalmarket.DealStatusNew, - Sender: miner, + Sender: p.ID, UnsealFundsPaid: big.Zero(), StoreID: storeID, } @@ -235,6 +243,21 @@ func (c *Client) notifySubscribers(eventName fsm.EventName, state fsm.StateType) _ = c.subscribers.Publish(internalEvent{evt, ds}) } +func (c *Client) addMultiaddrs(ctx context.Context, p retrievalmarket.RetrievalPeer) error { + tok, _, err := c.node.GetChainHead(ctx) + if err != nil { + return err + } + maddrs, err := c.node.GetKnownAddresses(ctx, p, tok) + if err != nil { + return err + } + if len(maddrs) > 0 { + c.network.AddAddrs(p.ID, maddrs) + } + return nil +} + // SubscribeToEvents allows another component to listen for events on the RetrievalClient // in order to track deals as they progress through the deal flow func (c *Client) SubscribeToEvents(subscriber retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe { diff --git a/retrievalmarket/impl/client_test.go b/retrievalmarket/impl/client_test.go index 5440a09d..c40163d2 100644 --- a/retrievalmarket/impl/client_test.go +++ b/retrievalmarket/impl/client_test.go @@ -95,11 +95,13 @@ func TestClient_Query(t *testing.T) { net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{ QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"), }) + node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}) + node.ExpectKnownAddresses(rpeer, nil) c, err := retrievalimpl.NewClient( net, multiStore, dt, - testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), + node, &tut.TestPeerResolver{}, ds, storedCounter) @@ -109,17 +111,20 @@ func TestClient_Query(t *testing.T) { require.NoError(t, err) assert.NotNil(t, resp) assert.Equal(t, expectedQueryResponse, resp) + node.VerifyExpectations(t) }) t.Run("when the stream returns error, returns error", func(t *testing.T) { net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{ QueryStreamBuilder: tut.FailNewQueryStream, }) + node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}) + node.ExpectKnownAddresses(rpeer, nil) c, err := retrievalimpl.NewClient( net, multiStore, dt, - testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), + node, &tut.TestPeerResolver{}, ds, storedCounter) @@ -127,6 +132,7 @@ func TestClient_Query(t *testing.T) { _, err = c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{}) assert.EqualError(t, err, "new query stream failed") + node.VerifyExpectations(t) }) t.Run("when WriteDealStatusRequest fails, returns error", func(t *testing.T) { @@ -142,11 +148,13 @@ func TestClient_Query(t *testing.T) { net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{ QueryStreamBuilder: qsbuilder, }) + node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}) + node.ExpectKnownAddresses(rpeer, nil) c, err := retrievalimpl.NewClient( net, multiStore, dt, - testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), + node, &tut.TestPeerResolver{}, ds, storedCounter) @@ -155,6 +163,7 @@ func TestClient_Query(t *testing.T) { statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{}) assert.EqualError(t, err, "write query failed") assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode) + node.VerifyExpectations(t) }) t.Run("when ReadDealStatusResponse fails, returns error", func(t *testing.T) { @@ -168,11 +177,13 @@ func TestClient_Query(t *testing.T) { net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{ QueryStreamBuilder: qsbuilder, }) + node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}) + node.ExpectKnownAddresses(rpeer, nil) c, err := retrievalimpl.NewClient( net, multiStore, dt, - testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), + node, &tut.TestPeerResolver{}, ds, storedCounter) @@ -181,6 +192,7 @@ func TestClient_Query(t *testing.T) { statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{}) assert.EqualError(t, err, "query response failed") assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode) + node.VerifyExpectations(t) }) } diff --git a/retrievalmarket/impl/integration_test.go b/retrievalmarket/impl/integration_test.go index 4454d823..e5252993 100644 --- a/retrievalmarket/impl/integration_test.go +++ b/retrievalmarket/impl/integration_test.go @@ -147,6 +147,7 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC Address: paymentAddress, ID: testData.Host2.ID(), } + rcNode1.ExpectKnownAddresses(retrievalPeer, nil) return client, expectedCIDs, missingCID, expectedQR, retrievalPeer, provider } @@ -296,7 +297,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { provider := setupProvider(bgCtx, t, testData, payloadCID, pieceInfo, expectedQR, providerPaymentAddr, providerNode, decider) - retrievalPeer := &retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID()} + retrievalPeer := retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID()} expectedVoucher := tut.MakeTestSignedVoucher() @@ -312,9 +313,11 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { // ------- SET UP CLIENT nw1 := rmnet.NewFromLibp2pHost(testData.Host1) - createdChan, newLaneAddr, createdVoucher, client, err := setupClient(bgCtx, t, clientPaymentChannel, expectedVoucher, nw1, testData, testCase.addFunds) + createdChan, newLaneAddr, createdVoucher, clientNode, client, err := setupClient(bgCtx, t, clientPaymentChannel, expectedVoucher, nw1, testData, testCase.addFunds) require.NoError(t, err) + clientNode.ExpectKnownAddresses(retrievalPeer, nil) + clientDealStateChan := make(chan retrievalmarket.ClientDealState) client.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) { switch event { @@ -358,7 +361,7 @@ CurrentInterval: %d }) // **** Send the query for the Piece // set up retrieval params - resp, err := client.Query(bgCtx, *retrievalPeer, payloadCID, retrievalmarket.QueryParams{}) + resp, err := client.Query(bgCtx, retrievalPeer, payloadCID, retrievalmarket.QueryParams{}) require.NoError(t, err) require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status) @@ -376,7 +379,7 @@ CurrentInterval: %d clientStoreID = &id } // *** Retrieve the piece - did, err := client.Retrieve(bgCtx, payloadCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address, clientStoreID) + did, err := client.Retrieve(bgCtx, payloadCID, rmParams, expectedTotal, retrievalPeer, clientPaymentChannel, retrievalPeer.Address, clientStoreID) assert.Equal(t, did, retrievalmarket.DealID(0)) require.NoError(t, err) @@ -416,7 +419,8 @@ CurrentInterval: %d if testCase.decider != nil { assert.True(t, customDeciderRan) } - // verify that the provider saved the same voucher values + // verify that the nodes we interacted with as expected + clientNode.VerifyExpectations(t) providerNode.VerifyExpectations(t) if testCase.skipStores { testData.VerifyFileTransferred(t, pieceLink, false, testCase.filesize) @@ -440,6 +444,7 @@ func setupClient( *pmtChan, *address.Address, *paych.SignedVoucher, + *testnodes.TestRetrievalClientNode, retrievalmarket.RetrievalClient, error) { var createdChan pmtChan @@ -475,7 +480,7 @@ func setupClient( require.NoError(t, err) client, err := retrievalimpl.NewClient(nw1, testData.MultiStore1, dt1, clientNode, &tut.TestPeerResolver{}, testData.Ds1, testData.RetrievalStoredCounter1) - return &createdChan, &newLaneAddr, &createdVoucher, client, err + return &createdChan, &newLaneAddr, &createdVoucher, clientNode, client, err } func setupProvider( diff --git a/retrievalmarket/impl/testnodes/test_retrieval_client_node.go b/retrievalmarket/impl/testnodes/test_retrieval_client_node.go index f690b8fa..20ba2618 100644 --- a/retrievalmarket/impl/testnodes/test_retrieval_client_node.go +++ b/retrievalmarket/impl/testnodes/test_retrieval_client_node.go @@ -2,9 +2,13 @@ package testnodes import ( "context" + "errors" "fmt" + "testing" "github.com/ipfs/go-cid" + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" @@ -25,10 +29,12 @@ type TestRetrievalClientNode struct { laneError error voucher *paych.SignedVoucher voucherError, waitCreateErr, waitAddErr error - - allocateLaneRecorder func(address.Address) - createPaymentVoucherRecorder func(voucher *paych.SignedVoucher) - getCreatePaymentChannelRecorder func(address.Address, address.Address, abi.TokenAmount) + knownAddreses map[retrievalmarket.RetrievalPeer][]ma.Multiaddr + receivedKnownAddresses map[retrievalmarket.RetrievalPeer]struct{} + expectedKnownAddresses map[retrievalmarket.RetrievalPeer]struct{} + allocateLaneRecorder func(address.Address) + createPaymentVoucherRecorder func(voucher *paych.SignedVoucher) + getCreatePaymentChannelRecorder func(address.Address, address.Address, abi.TokenAmount) } // TestRetrievalClientNodeParams are parameters for initializing a TestRetrievalClientNode @@ -66,6 +72,9 @@ func NewTestRetrievalClientNode(params TestRetrievalClientNodeParams) *TestRetri getCreatePaymentChannelRecorder: params.PaymentChannelRecorder, createPaychMsgCID: params.CreatePaychCID, addFundsMsgCID: params.AddFundsCID, + knownAddreses: map[retrievalmarket.RetrievalPeer][]ma.Multiaddr{}, + expectedKnownAddresses: map[retrievalmarket.RetrievalPeer]struct{}{}, + receivedKnownAddresses: map[retrievalmarket.RetrievalPeer]struct{}{}, } } @@ -119,3 +128,25 @@ func (trcn *TestRetrievalClientNode) WaitForPaymentChannelCreation(messageCID ci } return trcn.payCh, trcn.waitCreateErr } + +// ExpectKnownAddresses stubs a return for a look up of known addresses for the given retrieval peer +// and the fact that it was looked up is verified with VerifyExpectations +func (trcn *TestRetrievalClientNode) ExpectKnownAddresses(p retrievalmarket.RetrievalPeer, maddrs []ma.Multiaddr) { + trcn.expectedKnownAddresses[p] = struct{}{} + trcn.knownAddreses[p] = maddrs +} + +// GetKnownAddresses gets any on known multiaddrs for a given address, so we can add to the peer store +func (trcn *TestRetrievalClientNode) GetKnownAddresses(ctx context.Context, p retrievalmarket.RetrievalPeer, tok shared.TipSetToken) ([]ma.Multiaddr, error) { + trcn.receivedKnownAddresses[p] = struct{}{} + addrs, ok := trcn.knownAddreses[p] + if !ok { + return nil, errors.New("Provider not found") + } + return addrs, nil +} + +// VerifyExpectations verifies that all expected known addresses were looked up +func (trcn *TestRetrievalClientNode) VerifyExpectations(t *testing.T) { + require.Equal(t, trcn.expectedKnownAddresses, trcn.receivedKnownAddresses) +} diff --git a/retrievalmarket/network/libp2p_impl.go b/retrievalmarket/network/libp2p_impl.go index b5885b07..95d66d29 100644 --- a/retrievalmarket/network/libp2p_impl.go +++ b/retrievalmarket/network/libp2p_impl.go @@ -3,11 +3,13 @@ package network import ( "bufio" "context" + "time" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" "github.com/filecoin-project/go-fil-markets/retrievalmarket" ) @@ -71,3 +73,7 @@ func (impl *libp2pRetrievalMarketNetwork) handleNewQueryStream(s network.Stream) func (impl *libp2pRetrievalMarketNetwork) ID() peer.ID { return impl.host.ID() } + +func (impl *libp2pRetrievalMarketNetwork) AddAddrs(p peer.ID, addrs []ma.Multiaddr) { + impl.host.Peerstore().AddAddrs(p, addrs, 8*time.Hour) +} diff --git a/retrievalmarket/network/network.go b/retrievalmarket/network/network.go index e0bec417..c1c1bebe 100644 --- a/retrievalmarket/network/network.go +++ b/retrievalmarket/network/network.go @@ -2,6 +2,7 @@ package network import ( "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" "github.com/filecoin-project/go-fil-markets/retrievalmarket" ) @@ -42,4 +43,7 @@ type RetrievalMarketNetwork interface { // ID returns the peer id of the host for this network ID() peer.ID + + // AddAddrs adds the given multi-addrs to the peerstore for the passed peer ID + AddAddrs(peer.ID, []ma.Multiaddr) } diff --git a/retrievalmarket/nodes.go b/retrievalmarket/nodes.go index 095389c2..cd0d0b80 100644 --- a/retrievalmarket/nodes.go +++ b/retrievalmarket/nodes.go @@ -5,6 +5,7 @@ import ( "io" "github.com/ipfs/go-cid" + ma "github.com/multiformats/go-multiaddr" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" @@ -40,6 +41,9 @@ type RetrievalClientNode interface { // WaitForPaymentChannelCreation waits for a message on chain that a // payment channel has been created WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error) + + // GetKnownAddresses gets any on known multiaddrs for a given address, so we can add to the peer store + GetKnownAddresses(ctx context.Context, p RetrievalPeer, tok shared.TipSetToken) ([]ma.Multiaddr, error) } // RetrievalProviderNode are the node depedencies for a RetrevalProvider diff --git a/retrievalmarket/storage_retrieval_integration_test.go b/retrievalmarket/storage_retrieval_integration_test.go index 3279791c..214f44de 100644 --- a/retrievalmarket/storage_retrieval_integration_test.go +++ b/retrievalmarket/storage_retrieval_integration_test.go @@ -145,6 +145,8 @@ func TestStorageRetrieval(t *testing.T) { retrievalPeer := peers[0] require.NotNil(t, retrievalPeer.PieceCID) + rh.ClientNode.ExpectKnownAddresses(retrievalPeer, nil) + resp, err := rh.Client.Query(bgCtx, retrievalPeer, sh.PayloadCid, retrievalmarket.QueryParams{}) require.NoError(t, err) require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status) @@ -165,7 +167,7 @@ func TestStorageRetrieval(t *testing.T) { // *** Retrieve the piece clientStoreID := sh.TestData.MultiStore1.Next() - did, err := rh.Client.Retrieve(bgCtx, sh.PayloadCid, rmParams, expectedTotal, retrievalPeer.ID, *rh.ExpPaych, retrievalPeer.Address, &clientStoreID) + did, err := rh.Client.Retrieve(bgCtx, sh.PayloadCid, rmParams, expectedTotal, retrievalPeer, *rh.ExpPaych, retrievalPeer.Address, &clientStoreID) assert.Equal(t, did, retrievalmarket.DealID(0)) require.NoError(t, err) @@ -194,6 +196,7 @@ func TestStorageRetrieval(t *testing.T) { require.Equal(t, retrievalmarket.DealStatusCompleted, providerDealState.Status) require.Equal(t, retrievalmarket.DealStatusCompleted, clientDealState.Status) + rh.ClientNode.VerifyExpectations(t) sh.TestData.VerifyFileTransferredIntoStore(t, sh.PieceLink, clientStoreID, false, uint64(fsize)) } @@ -319,7 +322,7 @@ func newStorageHarness(ctx context.Context, t *testing.T) *storageHarness { PeerID: td.Host2.ID(), } - smState.Providers = []*storagemarket.StorageProviderInfo{&providerInfo} + smState.Providers = map[address.Address]*storagemarket.StorageProviderInfo{providerAddr: &providerInfo} return &storageHarness{ Ctx: ctx, Epoch: epoch, diff --git a/shared_testutil/test_network_types.go b/shared_testutil/test_network_types.go index 92159a83..02eaf46a 100644 --- a/shared_testutil/test_network_types.go +++ b/shared_testutil/test_network_types.go @@ -6,6 +6,7 @@ import ( "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" @@ -194,6 +195,10 @@ func (trmn *TestRetrievalMarketNetwork) ID() peer.ID { return peer.ID("") } +// AddAddrs does nothing in test +func (trmn *TestRetrievalMarketNetwork) AddAddrs(peer.ID, []ma.Multiaddr) { +} + var _ rmnet.RetrievalMarketNetwork = &TestRetrievalMarketNetwork{} // Some convenience builders diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index c9136446..a5306e35 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -124,7 +124,7 @@ func NewClient( // in progress deals func (c *Client) Start(ctx context.Context) error { go func() { - err := c.restartDeals() + err := c.restartDeals(ctx) if err != nil { log.Errorf("Failed to restart deals: %s", err.Error()) } @@ -320,6 +320,11 @@ its implementation of the Client FSM's ClientDealEnvironment. Documentation of the client state machine can be found at https://godoc.org/github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates */ func (c *Client) ProposeStorageDeal(ctx context.Context, params storagemarket.ProposeStorageDealParams) (*storagemarket.ProposeStorageDealResult, error) { + err := c.addMultiaddrs(ctx, params.Info.Address) + if err != nil { + return nil, xerrors.Errorf("looking up addresses: %w", err) + } + commP, pieceSize, err := clientutils.CommP(ctx, c.pio, params.Rt, params.Data, params.StoreID) if err != nil { return nil, xerrors.Errorf("computing commP failed: %w", err) @@ -449,7 +454,7 @@ func (c *Client) Configure(options ...StorageClientOption) { } } -func (c *Client) restartDeals() error { +func (c *Client) restartDeals(ctx context.Context) error { var deals []storagemarket.ClientDeal err := c.statemachines.List(&deals) if err != nil { @@ -457,6 +462,15 @@ func (c *Client) restartDeals() error { } for _, deal := range deals { + if c.statemachines.IsTerminated(deal) { + continue + } + + err = c.addMultiaddrs(ctx, deal.Proposal.Provider) + if err != nil { + return err + } + err = c.statemachines.Send(deal.ProposalCid, storagemarket.ClientEventRestart) if err != nil { return err @@ -500,6 +514,23 @@ func (c *Client) verifyStatusResponseSignature(ctx context.Context, miner addres return valid, nil } +func (c *Client) addMultiaddrs(ctx context.Context, providerAddr address.Address) error { + tok, _, err := c.node.GetChainHead(ctx) + if err != nil { + return err + } + minfo, err := c.node.GetMinerInfo(ctx, providerAddr, tok) + if err != nil { + return err + } + + if len(minfo.Addrs) > 0 { + c.net.AddAddrs(minfo.PeerID, minfo.Addrs) + } + + return nil +} + func newClientStateMachine(ds datastore.Datastore, env fsm.Environment, notifier fsm.Notifier) (fsm.Group, error) { return fsm.New(ds, fsm.Parameters{ Environment: env, diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index 71bbee1c..fe05cca7 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -177,6 +177,7 @@ func TestMakeDeal(t *testing.T) { // ensure that the handoff has fast retrieval info assert.Len(t, h.ProviderNode.OnDealCompleteCalls, 1) assert.True(t, h.ProviderNode.OnDealCompleteCalls[0].FastRetrieval) + h.ClientNode.VerifyExpectations(t) }) } } @@ -383,8 +384,9 @@ func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testut payloadCid := rootLink.(cidlink.Link).Cid clientNode := testnodes.FakeClientNode{ - FakeCommonNode: testnodes.FakeCommonNode{SMState: smState}, - ClientAddr: address.TestAddress, + FakeCommonNode: testnodes.FakeCommonNode{SMState: smState}, + ClientAddr: address.TestAddress, + ExpectedMinerInfos: []address.Address{address.TestAddress2}, } expDealID := abi.DealID(rand.Uint64()) @@ -476,7 +478,7 @@ func newHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testut PeerID: td.Host2.ID(), } - smState.Providers = []*storagemarket.StorageProviderInfo{&providerInfo} + smState.Providers = map[address.Address]*storagemarket.StorageProviderInfo{providerAddr: &providerInfo} return &harness{ Ctx: ctx, Epoch: epoch, diff --git a/storagemarket/network/libp2p_impl.go b/storagemarket/network/libp2p_impl.go index c83583b2..8bb5b1fa 100644 --- a/storagemarket/network/libp2p_impl.go +++ b/storagemarket/network/libp2p_impl.go @@ -112,5 +112,5 @@ func (impl *libp2pStorageMarketNetwork) ID() peer.ID { } func (impl *libp2pStorageMarketNetwork) AddAddrs(p peer.ID, addrs []ma.Multiaddr) { - impl.host.Peerstore().AddAddrs(p, addrs, time.Minute*10) + impl.host.Peerstore().AddAddrs(p, addrs, 8*time.Hour) } diff --git a/storagemarket/testnodes/testnodes.go b/storagemarket/testnodes/testnodes.go index cfaee951..0a851d40 100644 --- a/storagemarket/testnodes/testnodes.go +++ b/storagemarket/testnodes/testnodes.go @@ -7,8 +7,10 @@ import ( "errors" "io" "io/ioutil" + "testing" "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" @@ -33,7 +35,7 @@ type StorageMarketState struct { DealID abi.DealID Balances map[address.Address]abi.TokenAmount StorageDeals map[address.Address][]storagemarket.StorageDeal - Providers []*storagemarket.StorageProviderInfo + Providers map[address.Address]*storagemarket.StorageProviderInfo } // NewStorageMarketState returns a new empty state for the storage market @@ -43,7 +45,7 @@ func NewStorageMarketState() *StorageMarketState { DealID: 0, Balances: map[address.Address]abi.TokenAmount{}, StorageDeals: map[address.Address][]storagemarket.StorageDeal{}, - Providers: nil, + Providers: map[address.Address]*storagemarket.StorageProviderInfo{}, } } @@ -229,6 +231,8 @@ type FakeClientNode struct { ValidationError error ValidatePublishedDealID abi.DealID ValidatePublishedError error + ExpectedMinerInfos []address.Address + receivedMinerInfos []address.Address } // ListClientDeals just returns the deals in the storage market state @@ -238,7 +242,11 @@ func (n *FakeClientNode) ListClientDeals(ctx context.Context, addr address.Addre // ListStorageProviders lists the providers in the storage market state func (n *FakeClientNode) ListStorageProviders(ctx context.Context, tok shared.TipSetToken) ([]*storagemarket.StorageProviderInfo, error) { - return n.SMState.Providers, nil + providers := make([]*storagemarket.StorageProviderInfo, 0, len(n.SMState.Providers)) + for _, provider := range n.SMState.Providers { + providers = append(providers, provider) + } + return providers, nil } // ValidatePublishedDeal always succeeds @@ -261,10 +269,12 @@ func (n *FakeClientNode) GetDefaultWalletAddress(ctx context.Context) (address.A // GetMinerInfo returns stubbed information for the first miner in storage market state func (n *FakeClientNode) GetMinerInfo(ctx context.Context, maddr address.Address, tok shared.TipSetToken) (*storagemarket.StorageProviderInfo, error) { - if len(n.SMState.Providers) == 0 { + n.receivedMinerInfos = append(n.receivedMinerInfos, maddr) + info, ok := n.SMState.Providers[maddr] + if !ok { return nil, errors.New("Provider not found") } - return n.SMState.Providers[0], nil + return info, nil } // ValidateAskSignature returns the stubbed validation error and a boolean value @@ -273,6 +283,10 @@ func (n *FakeClientNode) ValidateAskSignature(ctx context.Context, ask *storagem return n.ValidationError == nil, n.ValidationError } +func (n *FakeClientNode) VerifyExpectations(t *testing.T) { + require.Equal(t, n.ExpectedMinerInfos, n.receivedMinerInfos) +} + var _ storagemarket.StorageClientNode = (*FakeClientNode)(nil) // FakeProviderNode implements functions specific to the StorageProviderNode