Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique Blockstores: RetrievalMarket #342

Merged
merged 4 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/retrievalprovider.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ stateDiagram-v2
The following events are not shown cause they can trigger from any state.

ProviderEventDataTransferError - transitions state to DealStatusErrored
ProviderEventMultiStoreError - transitions state to DealStatusErrored
end note
0 --> 0 : ProviderEventOpen
0 --> 1 : ProviderEventDealAccepted
Expand Down
Binary file modified docs/retrievalprovider.mmd.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions docs/retrievalprovider.mmd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions retrievalmarket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/specs-actors/actors/abi"
)

Expand Down Expand Up @@ -37,6 +38,7 @@ type RetrievalClient interface {
miner peer.ID,
clientWallet address.Address,
minerWallet address.Address,
storeID *multistore.StoreID,
) (DealID, error)

// SubscribeToEvents listens for events that happen related to client retrievals
Expand Down
4 changes: 4 additions & 0 deletions retrievalmarket/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ const (

// ProviderEventCleanupComplete happens when a deal is finished cleaning up and enters a complete state
ProviderEventCleanupComplete

// ProviderEventMultiStoreError occurs when an error happens attempting to operate on the multistore
ProviderEventMultiStoreError
)

// ProviderEvents is a human readable map of provider event name -> event description
Expand All @@ -205,4 +208,5 @@ var ProviderEvents = map[ProviderEvent]string{
ProviderEventDataTransferError: "ProviderEventDataTransferError",
ProviderEventCancelComplete: "ProviderEventCancelComplete",
ProviderEventCleanupComplete: "ProviderEventCleanupComplete",
ProviderEventMultiStoreError: "ProviderEventMultiStoreError",
}
39 changes: 33 additions & 6 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/go-storedcounter"
"github.com/filecoin-project/specs-actors/actors/abi"
Expand All @@ -32,7 +32,7 @@ var log = logging.Logger("retrieval")
type Client struct {
network rmnet.RetrievalMarketNetwork
dataTransfer datatransfer.Manager
bs blockstore.Blockstore
multiStore *multistore.MultiStore
node retrievalmarket.RetrievalClientNode
storedCounter *storedcounter.StoredCounter

Expand Down Expand Up @@ -64,7 +64,7 @@ var _ retrievalmarket.RetrievalClient = &Client{}
// NewClient creates a new retrieval client
func NewClient(
network rmnet.RetrievalMarketNetwork,
bs blockstore.Blockstore,
multiStore *multistore.MultiStore,
dataTransfer datatransfer.Manager,
node retrievalmarket.RetrievalClientNode,
resolver retrievalmarket.PeerResolver,
Expand All @@ -73,7 +73,7 @@ func NewClient(
) (retrievalmarket.RetrievalClient, error) {
c := &Client{
network: network,
bs: bs,
multiStore: multiStore,
dataTransfer: dataTransfer,
node: node,
resolver: resolver,
Expand Down Expand Up @@ -106,6 +106,10 @@ func NewClient(
return nil, err
}
dataTransfer.SubscribeToEvents(dtutils.ClientDataTransferSubscriber(c.stateMachines))
err = dataTransfer.RegisterTransportConfigurer(&retrievalmarket.DealProposal{}, dtutils.TransportConfigurer(network.ID(), &clientStoreGetter{c}))
if err != nil {
return nil, err
}
return c, nil
}

Expand Down Expand Up @@ -177,14 +181,20 @@ From then on, the statemachine controls the deal flow in the client. Other compo

Documentation of the client state machine can be found at https://godoc.org/github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates
*/
func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, miner peer.ID, clientWallet address.Address, minerWallet address.Address) (retrievalmarket.DealID, error) {
func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, miner peer.ID, clientWallet address.Address, minerWallet address.Address, storeID *multistore.StoreID) (retrievalmarket.DealID, error) {
var err error
next, err := c.storedCounter.Next()
if err != nil {
return 0, err
}
// make sure the store is loadable
if storeID != nil {
_, err = c.multiStore.Get(*storeID)
if err != nil {
return 0, err
}
}
dealID := retrievalmarket.DealID(next)

dealState := retrievalmarket.ClientDealState{
DealProposal: retrievalmarket.DealProposal{
PayloadCID: payloadCID,
Expand All @@ -202,6 +212,7 @@ func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie
Status: retrievalmarket.DealStatusNew,
Sender: miner,
UnsealFundsPaid: big.Zero(),
StoreID: storeID,
}

// start the deal processing
Expand Down Expand Up @@ -286,6 +297,22 @@ func (c *clientDealEnvironment) CloseDataTransfer(ctx context.Context, channelID
return c.c.dataTransfer.CloseDataTransferChannel(ctx, channelID)
}

type clientStoreGetter struct {
c *Client
}

func (csg *clientStoreGetter) Get(otherPeer peer.ID, dealID retrievalmarket.DealID) (*multistore.Store, error) {
var deal retrievalmarket.ClientDealState
err := csg.c.stateMachines.Get(dealID).Get(&deal)
if err != nil {
return nil, err
}
if deal.StoreID == nil {
return nil, nil
}
return csg.c.multiStore.Get(*deal.StoreID)
}

// ClientFSMParameterSpec is a valid set of parameters for a client deal FSM - used in doc generation
var ClientFSMParameterSpec = fsm.Parameters{
Environment: &clientDealEnvironment{},
Expand Down
32 changes: 19 additions & 13 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (

"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-storedcounter"
"github.com/filecoin-project/specs-actors/actors/abi"

Expand All @@ -28,12 +28,13 @@ func TestClient_Construction(t *testing.T) {

ds := dss.MutexWrap(datastore.NewMapDatastore())
storedCounter := storedcounter.New(ds, datastore.NewKey("nextDealID"))
bs := bstore.NewBlockstore(ds)
multiStore, err := multistore.NewMultiDstore(ds)
require.NoError(t, err)
dt := tut.NewTestDataTransfer()
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{})
_, err := retrievalimpl.NewClient(
_, err = retrievalimpl.NewClient(
net,
bs,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&tut.TestPeerResolver{},
Expand All @@ -50,14 +51,18 @@ func TestClient_Construction(t *testing.T) {
require.True(t, ok)
_, ok = dt.RegisteredVoucherTypes[1].VoucherType.(*retrievalmarket.DealPayment)
require.True(t, ok)
require.Len(t, dt.RegisteredTransportConfigurers, 1)
_, ok = dt.RegisteredTransportConfigurers[0].VoucherType.(*retrievalmarket.DealProposal)
require.True(t, ok)
}

func TestClient_Query(t *testing.T) {
ctx := context.Background()

ds := dss.MutexWrap(datastore.NewMapDatastore())
storedCounter := storedcounter.New(ds, datastore.NewKey("nextDealID"))
bs := bstore.NewBlockstore(ds)
multiStore, err := multistore.NewMultiDstore(ds)
require.NoError(t, err)
dt := tut.NewTestDataTransfer()

pcid := tut.GenerateCids(1)[0]
Expand Down Expand Up @@ -92,7 +97,7 @@ func TestClient_Query(t *testing.T) {
})
c, err := retrievalimpl.NewClient(
net,
bs,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&tut.TestPeerResolver{},
Expand All @@ -112,7 +117,7 @@ func TestClient_Query(t *testing.T) {
})
c, err := retrievalimpl.NewClient(
net,
bs,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&tut.TestPeerResolver{},
Expand All @@ -139,7 +144,7 @@ func TestClient_Query(t *testing.T) {
})
c, err := retrievalimpl.NewClient(
net,
bs,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&tut.TestPeerResolver{},
Expand All @@ -165,7 +170,7 @@ func TestClient_Query(t *testing.T) {
})
c, err := retrievalimpl.NewClient(
net,
bs,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&tut.TestPeerResolver{},
Expand All @@ -182,7 +187,8 @@ func TestClient_Query(t *testing.T) {
func TestClient_FindProviders(t *testing.T) {
ds := dss.MutexWrap(datastore.NewMapDatastore())
storedCounter := storedcounter.New(ds, datastore.NewKey("nextDealID"))
bs := bstore.NewBlockstore(ds)
multiStore, err := multistore.NewMultiDstore(ds)
require.NoError(t, err)
dt := tut.NewTestDataTransfer()
expectedPeer := peer.ID("somevalue")

Expand All @@ -200,7 +206,7 @@ func TestClient_FindProviders(t *testing.T) {
peers := tut.RequireGenerateRetrievalPeers(t, 3)
testResolver := tut.TestPeerResolver{Peers: peers}

c, err := retrievalimpl.NewClient(net, bs, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter)
c, err := retrievalimpl.NewClient(net, multiStore, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter)
require.NoError(t, err)

testCid := tut.GenerateCids(1)[0]
Expand All @@ -209,7 +215,7 @@ func TestClient_FindProviders(t *testing.T) {

t.Run("when there is an error, returns empty provider list", func(t *testing.T) {
testResolver := tut.TestPeerResolver{Peers: []retrievalmarket.RetrievalPeer{}, ResolverError: errors.New("boom")}
c, err := retrievalimpl.NewClient(net, bs, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter)
c, err := retrievalimpl.NewClient(net, multiStore, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter)
require.NoError(t, err)

badCid := tut.GenerateCids(1)[0]
Expand All @@ -218,7 +224,7 @@ func TestClient_FindProviders(t *testing.T) {

t.Run("when there are no providers", func(t *testing.T) {
testResolver := tut.TestPeerResolver{Peers: []retrievalmarket.RetrievalPeer{}}
c, err := retrievalimpl.NewClient(net, bs, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter)
c, err := retrievalimpl.NewClient(net, multiStore, dt, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter)
require.NoError(t, err)

testCid := tut.GenerateCids(1)[0]
Expand Down
41 changes: 41 additions & 0 deletions retrievalmarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (
"math"

logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-statemachine/fsm"

rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
Expand Down Expand Up @@ -134,3 +137,41 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
}
}
}

// StoreGetter retrieves the store for a given proposal cid
type StoreGetter interface {
Get(otherPeer peer.ID, dealID rm.DealID) (*multistore.Store, error)
}

// StoreConfigurableTransport defines the methods needed to
// configure a data transfer transport use a unique store for a given request
type StoreConfigurableTransport interface {
UseStore(datatransfer.ChannelID, ipld.Loader, ipld.Storer) error
}

// TransportConfigurer configurers the graphsync transport to use a custom blockstore per deal
func TransportConfigurer(thisPeer peer.ID, storeGetter StoreGetter) datatransfer.TransportConfigurer {
return func(channelID datatransfer.ChannelID, voucher datatransfer.Voucher, transport datatransfer.Transport) {
dealProposal, ok := voucher.(*rm.DealProposal)
if !ok {
return
}
gsTransport, ok := transport.(StoreConfigurableTransport)
if !ok {
return
}
otherPeer := channelID.OtherParty(thisPeer)
store, err := storeGetter.Get(otherPeer, dealProposal.ID)
if err != nil {
log.Errorf("attempting to configure data store: %w", err)
return
}
if store == nil {
return
}
err = gsTransport.UseStore(channelID, store.Loader, store.Storer)
if err != nil {
log.Errorf("attempting to configure data store: %w", err)
}
}
}
Loading