Skip to content

Commit

Permalink
Properly add multiaddrs to avoid dialing issues (#356)
Browse files Browse the repository at this point in the history
* feat(network): add multiaddrs properly

Add multiaddrs properly in storage market, retrieval market

* fix(imports): fix imports check
  • Loading branch information
hannahhoward authored Aug 5, 2020
1 parent 66c8939 commit 5f571e5
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 39 deletions.
6 changes: 3 additions & 3 deletions docs/retrievalclient.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ stateDiagram-v2
19 --> 15 : ClientEventComplete
8 --> 17 : <invalid Value>

note left of 3 : The following events only record in this state.<br><br>ClientEventLastPaymentRequested<br>ClientEventPaymentRequested<br>ClientEventAllBlocksReceived<br>ClientEventBlocksReceived


note left of 4 : The following events only record in this state.<br><br>ClientEventLastPaymentRequested<br>ClientEventPaymentRequested<br>ClientEventAllBlocksReceived<br>ClientEventBlocksReceived


Expand All @@ -84,6 +87,3 @@ stateDiagram-v2

note left of 6 : The following events only record in this state.<br><br>ClientEventLastPaymentRequested<br>ClientEventPaymentRequested<br>ClientEventAllBlocksReceived<br>ClientEventBlocksReceived


note left of 3 : The following events only record in this state.<br><br>ClientEventLastPaymentRequested<br>ClientEventPaymentRequested<br>ClientEventAllBlocksReceived<br>ClientEventBlocksReceived

6 changes: 3 additions & 3 deletions docs/retrievalclient.mmd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 1 addition & 2 deletions retrievalmarket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-multistore"
Expand Down Expand Up @@ -35,7 +34,7 @@ type RetrievalClient interface {
payloadCID cid.Cid,
params Params,
totalFunds abi.TokenAmount,
miner peer.ID,
p RetrievalPeer,
clientWallet address.Address,
minerWallet address.Address,
storeID *multistore.StoreID,
Expand Down
31 changes: 27 additions & 4 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,12 @@ the request are.
The client a new `RetrievalQueryStream` for the chosen peer ID,
and calls WriteQuery on it, which constructs a data-transfer message and writes it to the Query stream.
*/
func (c *Client) Query(_ context.Context, p retrievalmarket.RetrievalPeer, payloadCID cid.Cid, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) {
func (c *Client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, payloadCID cid.Cid, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) {
err := c.addMultiaddrs(ctx, p)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}
s, err := c.network.NewQueryStream(p.ID)
if err != nil {
log.Warn(err)
Expand Down Expand Up @@ -181,8 +186,11 @@ From then on, the statemachine controls the deal flow in the client. Other compo
Documentation of the client state machine can be found at https://godoc.org/github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates
*/
func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, miner peer.ID, clientWallet address.Address, minerWallet address.Address, storeID *multistore.StoreID) (retrievalmarket.DealID, error) {
var err error
func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, p retrievalmarket.RetrievalPeer, clientWallet address.Address, minerWallet address.Address, storeID *multistore.StoreID) (retrievalmarket.DealID, error) {
err := c.addMultiaddrs(ctx, p)
if err != nil {
return 0, err
}
next, err := c.storedCounter.Next()
if err != nil {
return 0, err
Expand Down Expand Up @@ -210,7 +218,7 @@ func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie
PaymentRequested: abi.NewTokenAmount(0),
FundsSpent: abi.NewTokenAmount(0),
Status: retrievalmarket.DealStatusNew,
Sender: miner,
Sender: p.ID,
UnsealFundsPaid: big.Zero(),
StoreID: storeID,
}
Expand All @@ -235,6 +243,21 @@ func (c *Client) notifySubscribers(eventName fsm.EventName, state fsm.StateType)
_ = c.subscribers.Publish(internalEvent{evt, ds})
}

func (c *Client) addMultiaddrs(ctx context.Context, p retrievalmarket.RetrievalPeer) error {
tok, _, err := c.node.GetChainHead(ctx)
if err != nil {
return err
}
maddrs, err := c.node.GetKnownAddresses(ctx, p, tok)
if err != nil {
return err
}
if len(maddrs) > 0 {
c.network.AddAddrs(p.ID, maddrs)
}
return nil
}

// SubscribeToEvents allows another component to listen for events on the RetrievalClient
// in order to track deals as they progress through the deal flow
func (c *Client) SubscribeToEvents(subscriber retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe {
Expand Down
20 changes: 16 additions & 4 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
node.ExpectKnownAddresses(rpeer, nil)
c, err := retrievalimpl.NewClient(
net,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
node,
&tut.TestPeerResolver{},
ds,
storedCounter)
Expand All @@ -109,24 +111,28 @@ func TestClient_Query(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, resp)
assert.Equal(t, expectedQueryResponse, resp)
node.VerifyExpectations(t)
})

t.Run("when the stream returns error, returns error", func(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.FailNewQueryStream,
})
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
node.ExpectKnownAddresses(rpeer, nil)
c, err := retrievalimpl.NewClient(
net,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
node,
&tut.TestPeerResolver{},
ds,
storedCounter)
require.NoError(t, err)

_, err = c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "new query stream failed")
node.VerifyExpectations(t)
})

t.Run("when WriteDealStatusRequest fails, returns error", func(t *testing.T) {
Expand All @@ -142,11 +148,13 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
node.ExpectKnownAddresses(rpeer, nil)
c, err := retrievalimpl.NewClient(
net,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
node,
&tut.TestPeerResolver{},
ds,
storedCounter)
Expand All @@ -155,6 +163,7 @@ func TestClient_Query(t *testing.T) {
statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "write query failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
node.VerifyExpectations(t)
})

t.Run("when ReadDealStatusResponse fails, returns error", func(t *testing.T) {
Expand All @@ -168,11 +177,13 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
node.ExpectKnownAddresses(rpeer, nil)
c, err := retrievalimpl.NewClient(
net,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
node,
&tut.TestPeerResolver{},
ds,
storedCounter)
Expand All @@ -181,6 +192,7 @@ func TestClient_Query(t *testing.T) {
statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "query response failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
node.VerifyExpectations(t)
})
}

Expand Down
17 changes: 11 additions & 6 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC
Address: paymentAddress,
ID: testData.Host2.ID(),
}
rcNode1.ExpectKnownAddresses(retrievalPeer, nil)
return client, expectedCIDs, missingCID, expectedQR, retrievalPeer, provider
}

Expand Down Expand Up @@ -296,7 +297,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
provider := setupProvider(bgCtx, t, testData, payloadCID, pieceInfo, expectedQR,
providerPaymentAddr, providerNode, decider)

retrievalPeer := &retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID()}
retrievalPeer := retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID()}

expectedVoucher := tut.MakeTestSignedVoucher()

Expand All @@ -312,9 +313,11 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {

// ------- SET UP CLIENT
nw1 := rmnet.NewFromLibp2pHost(testData.Host1)
createdChan, newLaneAddr, createdVoucher, client, err := setupClient(bgCtx, t, clientPaymentChannel, expectedVoucher, nw1, testData, testCase.addFunds)
createdChan, newLaneAddr, createdVoucher, clientNode, client, err := setupClient(bgCtx, t, clientPaymentChannel, expectedVoucher, nw1, testData, testCase.addFunds)
require.NoError(t, err)

clientNode.ExpectKnownAddresses(retrievalPeer, nil)

clientDealStateChan := make(chan retrievalmarket.ClientDealState)
client.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
switch event {
Expand Down Expand Up @@ -358,7 +361,7 @@ CurrentInterval: %d
})
// **** Send the query for the Piece
// set up retrieval params
resp, err := client.Query(bgCtx, *retrievalPeer, payloadCID, retrievalmarket.QueryParams{})
resp, err := client.Query(bgCtx, retrievalPeer, payloadCID, retrievalmarket.QueryParams{})
require.NoError(t, err)
require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status)

Expand All @@ -376,7 +379,7 @@ CurrentInterval: %d
clientStoreID = &id
}
// *** Retrieve the piece
did, err := client.Retrieve(bgCtx, payloadCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address, clientStoreID)
did, err := client.Retrieve(bgCtx, payloadCID, rmParams, expectedTotal, retrievalPeer, clientPaymentChannel, retrievalPeer.Address, clientStoreID)
assert.Equal(t, did, retrievalmarket.DealID(0))
require.NoError(t, err)

Expand Down Expand Up @@ -416,7 +419,8 @@ CurrentInterval: %d
if testCase.decider != nil {
assert.True(t, customDeciderRan)
}
// verify that the provider saved the same voucher values
// verify that the nodes we interacted with as expected
clientNode.VerifyExpectations(t)
providerNode.VerifyExpectations(t)
if testCase.skipStores {
testData.VerifyFileTransferred(t, pieceLink, false, testCase.filesize)
Expand All @@ -440,6 +444,7 @@ func setupClient(
*pmtChan,
*address.Address,
*paych.SignedVoucher,
*testnodes.TestRetrievalClientNode,
retrievalmarket.RetrievalClient,
error) {
var createdChan pmtChan
Expand Down Expand Up @@ -475,7 +480,7 @@ func setupClient(
require.NoError(t, err)

client, err := retrievalimpl.NewClient(nw1, testData.MultiStore1, dt1, clientNode, &tut.TestPeerResolver{}, testData.Ds1, testData.RetrievalStoredCounter1)
return &createdChan, &newLaneAddr, &createdVoucher, client, err
return &createdChan, &newLaneAddr, &createdVoucher, clientNode, client, err
}

func setupProvider(
Expand Down
39 changes: 35 additions & 4 deletions retrievalmarket/impl/testnodes/test_retrieval_client_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package testnodes

import (
"context"
"errors"
"fmt"
"testing"

"github.com/ipfs/go-cid"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
Expand All @@ -25,10 +29,12 @@ type TestRetrievalClientNode struct {
laneError error
voucher *paych.SignedVoucher
voucherError, waitCreateErr, waitAddErr error

allocateLaneRecorder func(address.Address)
createPaymentVoucherRecorder func(voucher *paych.SignedVoucher)
getCreatePaymentChannelRecorder func(address.Address, address.Address, abi.TokenAmount)
knownAddreses map[retrievalmarket.RetrievalPeer][]ma.Multiaddr
receivedKnownAddresses map[retrievalmarket.RetrievalPeer]struct{}
expectedKnownAddresses map[retrievalmarket.RetrievalPeer]struct{}
allocateLaneRecorder func(address.Address)
createPaymentVoucherRecorder func(voucher *paych.SignedVoucher)
getCreatePaymentChannelRecorder func(address.Address, address.Address, abi.TokenAmount)
}

// TestRetrievalClientNodeParams are parameters for initializing a TestRetrievalClientNode
Expand Down Expand Up @@ -66,6 +72,9 @@ func NewTestRetrievalClientNode(params TestRetrievalClientNodeParams) *TestRetri
getCreatePaymentChannelRecorder: params.PaymentChannelRecorder,
createPaychMsgCID: params.CreatePaychCID,
addFundsMsgCID: params.AddFundsCID,
knownAddreses: map[retrievalmarket.RetrievalPeer][]ma.Multiaddr{},
expectedKnownAddresses: map[retrievalmarket.RetrievalPeer]struct{}{},
receivedKnownAddresses: map[retrievalmarket.RetrievalPeer]struct{}{},
}
}

Expand Down Expand Up @@ -119,3 +128,25 @@ func (trcn *TestRetrievalClientNode) WaitForPaymentChannelCreation(messageCID ci
}
return trcn.payCh, trcn.waitCreateErr
}

// ExpectKnownAddresses stubs a return for a look up of known addresses for the given retrieval peer
// and the fact that it was looked up is verified with VerifyExpectations
func (trcn *TestRetrievalClientNode) ExpectKnownAddresses(p retrievalmarket.RetrievalPeer, maddrs []ma.Multiaddr) {
trcn.expectedKnownAddresses[p] = struct{}{}
trcn.knownAddreses[p] = maddrs
}

// GetKnownAddresses gets any on known multiaddrs for a given address, so we can add to the peer store
func (trcn *TestRetrievalClientNode) GetKnownAddresses(ctx context.Context, p retrievalmarket.RetrievalPeer, tok shared.TipSetToken) ([]ma.Multiaddr, error) {
trcn.receivedKnownAddresses[p] = struct{}{}
addrs, ok := trcn.knownAddreses[p]
if !ok {
return nil, errors.New("Provider not found")
}
return addrs, nil
}

// VerifyExpectations verifies that all expected known addresses were looked up
func (trcn *TestRetrievalClientNode) VerifyExpectations(t *testing.T) {
require.Equal(t, trcn.expectedKnownAddresses, trcn.receivedKnownAddresses)
}
6 changes: 6 additions & 0 deletions retrievalmarket/network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package network
import (
"bufio"
"context"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
)
Expand Down Expand Up @@ -71,3 +73,7 @@ func (impl *libp2pRetrievalMarketNetwork) handleNewQueryStream(s network.Stream)
func (impl *libp2pRetrievalMarketNetwork) ID() peer.ID {
return impl.host.ID()
}

func (impl *libp2pRetrievalMarketNetwork) AddAddrs(p peer.ID, addrs []ma.Multiaddr) {
impl.host.Peerstore().AddAddrs(p, addrs, 8*time.Hour)
}
4 changes: 4 additions & 0 deletions retrievalmarket/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package network

import (
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
)
Expand Down Expand Up @@ -42,4 +43,7 @@ type RetrievalMarketNetwork interface {

// ID returns the peer id of the host for this network
ID() peer.ID

// AddAddrs adds the given multi-addrs to the peerstore for the passed peer ID
AddAddrs(peer.ID, []ma.Multiaddr)
}
4 changes: 4 additions & 0 deletions retrievalmarket/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"

"github.com/ipfs/go-cid"
ma "github.com/multiformats/go-multiaddr"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
Expand Down Expand Up @@ -40,6 +41,9 @@ type RetrievalClientNode interface {
// WaitForPaymentChannelCreation waits for a message on chain that a
// payment channel has been created
WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error)

// GetKnownAddresses gets any on known multiaddrs for a given address, so we can add to the peer store
GetKnownAddresses(ctx context.Context, p RetrievalPeer, tok shared.TipSetToken) ([]ma.Multiaddr, error)
}

// RetrievalProviderNode are the node depedencies for a RetrevalProvider
Expand Down
Loading

0 comments on commit 5f571e5

Please sign in to comment.