Skip to content

Commit

Permalink
feat: batch publish deal messages
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jan 25, 2021
1 parent 1b494ac commit 09c463b
Show file tree
Hide file tree
Showing 16 changed files with 968 additions and 607 deletions.
2 changes: 1 addition & 1 deletion api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,9 @@ type GatewayStruct struct {
StateMinerProvingDeadline func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*dline.Info, error)
StateMinerPower func(context.Context, address.Address, types.TipSetKey) (*api.MinerPower, error)
StateMarketBalance func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MarketBalance, error)
StateSearchMsg func(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error)
StateMarketStorageDeal func(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error)
StateNetworkVersion func(ctx context.Context, tsk types.TipSetKey) (stnetwork.Version, error)
StateSearchMsg func(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error)
StateSectorGetInfo func(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateVerifiedClientStatus func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error)
StateWaitMsg func(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
Expand Down
96 changes: 96 additions & 0 deletions api/test/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
dstest "github.com/ipfs/go-merkledag/test"
Expand Down Expand Up @@ -88,6 +93,97 @@ func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api
return res, data, nil
}

func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
publishPeriod := 10 * time.Second
maxDealsPerMsg := uint64(2)

// Set max deals per publish deals message to 2
minerDef := []StorageMiner{{
Full: 0,
Opts: node.Override(
new(*storageadapter.DealPublisher),
storageadapter.NewDealPublisher(nil, &config.PublishMsgConfig{
PublishPeriod: config.Duration(publishPeriod),
MaxDealsPerMsg: maxDealsPerMsg,
})),
Preseal: PresealGenesis,
}}

// Create a connect client and miner node
n, sn := b(t, OneFull, minerDef)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
s := connectAndStartMining(t, b, blocktime, client, miner)
defer s.blockMiner.Stop()

// Starts a deal and waits until it's published
runDealTillPublish := func(rseed int) {
res, _, err := CreateClientFile(s.ctx, s.client, rseed)
require.NoError(t, err)

upds, err := client.ClientGetDealUpdates(s.ctx)
require.NoError(t, err)

startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)

// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)

done := make(chan struct{})
go func() {
for upd := range upds {
if upd.DataRef.Root == res.Root && upd.State == storagemarket.StorageDealAwaitingPreCommit {
done <- struct{}{}
}
}
}()
<-done
}

// Run three deals in parallel
done := make(chan struct{}, maxDealsPerMsg+1)
for rseed := 1; rseed <= 3; rseed++ {
rseed := rseed
go func() {
runDealTillPublish(rseed)
done <- struct{}{}
}()
}

// Wait for two of the deals to be published
for i := 0; i < int(maxDealsPerMsg); i++ {
<-done
}

// Expect a single PublishStorageDeals message that includes the first two deals
msgCids, err := s.client.StateListMessages(s.ctx, &api.MessageMatch{To: market.Address}, types.EmptyTSK, 1)
require.NoError(t, err)
count := 0
for _, msgCid := range msgCids {
msg, err := s.client.ChainGetMessage(s.ctx, msgCid)
require.NoError(t, err)

if msg.Method == market.Methods.PublishStorageDeals {
count++
var pubDealsParams market2.PublishStorageDealsParams
err = pubDealsParams.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
require.Len(t, pubDealsParams.Deals, int(maxDealsPerMsg))
}
}
require.Equal(t, 1, count)

// The third deal should be published once the publish period expires.
// Allow a little padding as it takes a moment for the state change to
// be noticed by the client.
padding := 10 * time.Second
select {
case <-time.After(publishPeriod + padding):
require.Fail(t, "Expected 3rd deal to be published once publish period elapsed")
case <-done: // Success
}
}

func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
s := setupOneClientOneMiner(t, b, blocktime)
defer s.blockMiner.Stop()
Expand Down
1 change: 1 addition & 0 deletions api/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const GenesisPreseals = 2
// Options for setting up a mock storage miner
type StorageMiner struct {
Full int
Opts node.Option
Preseal int
}

Expand Down
20 changes: 13 additions & 7 deletions markets/storageadapter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ import (

type ClientNodeAdapter struct {
*clientApi
*apiWrapper

fundmgr *market.FundManager
ev *events.Events
dsMatcher *dealStateMatcher
scMgr *SectorCommittedManager
}

type clientApi struct {
Expand All @@ -50,14 +50,16 @@ type clientApi struct {

func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
capi := &clientApi{chain, stateapi, mpool}
return &ClientNodeAdapter{
clientApi: capi,
apiWrapper: &apiWrapper{api: capi},
ev := events.NewEvents(context.TODO(), capi)
a := &ClientNodeAdapter{
clientApi: capi,

fundmgr: fundmgr,
ev: events.NewEvents(context.TODO(), capi),
ev: ev,
dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(capi))),
}
a.scMgr = NewSectorCommittedManager(ev, a, &apiWrapper{api: capi})
return a
}

func (c *ClientNodeAdapter) ListStorageProviders(ctx context.Context, encodedTs shared.TipSetToken) ([]*storagemarket.StorageProviderInfo, error) {
Expand Down Expand Up @@ -135,6 +137,7 @@ func (c *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address

// ValidatePublishedDeal validates that the provided deal has appeared on chain and references the same ClientDeal
// returns the Deal id if there is no error
// TODO: Don't return deal ID
func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal storagemarket.ClientDeal) (abi.DealID, error) {
log.Infow("DEAL ACCEPTED!")

Expand Down Expand Up @@ -216,14 +219,17 @@ func (c *ClientNodeAdapter) DealProviderCollateralBounds(ctx context.Context, si
return big.Mul(bounds.Min, big.NewInt(clientOverestimation)), bounds.Max, nil
}

// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
func (c *ClientNodeAdapter) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
return OnDealSectorPreCommitted(ctx, c, c.ev, provider, dealID, marketactor.DealProposal(proposal), publishCid, cb)
return c.scMgr.OnDealSectorPreCommitted(ctx, provider, marketactor.DealProposal(proposal), *publishCid, cb)
}

// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
return OnDealSectorCommitted(ctx, c, c.ev, provider, dealID, sectorNumber, marketactor.DealProposal(proposal), publishCid, cb)
return c.scMgr.OnDealSectorCommitted(ctx, provider, sectorNumber, marketactor.DealProposal(proposal), *publishCid, cb)
}

// TODO: Replace dealID parameter with DealProposal
func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
head, err := c.ChainHead(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit 09c463b

Please sign in to comment.