Skip to content

Commit

Permalink
feat: batch publish deal messages
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jan 25, 2021
1 parent 29b076a commit 1dbef15
Show file tree
Hide file tree
Showing 25 changed files with 1,531 additions and 632 deletions.
2 changes: 1 addition & 1 deletion api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,9 @@ type GatewayStruct struct {
StateMinerProvingDeadline func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*dline.Info, error)
StateMinerPower func(context.Context, address.Address, types.TipSetKey) (*api.MinerPower, error)
StateMarketBalance func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MarketBalance, error)
StateSearchMsg func(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error)
StateMarketStorageDeal func(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error)
StateNetworkVersion func(ctx context.Context, tsk types.TipSetKey) (stnetwork.Version, error)
StateSearchMsg func(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error)
StateSectorGetInfo func(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateVerifiedClientStatus func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error)
StateWaitMsg func(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
Expand Down
96 changes: 96 additions & 0 deletions api/test/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
dstest "github.com/ipfs/go-merkledag/test"
Expand Down Expand Up @@ -88,6 +93,97 @@ func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api
return res, data, nil
}

func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
publishPeriod := 10 * time.Second
maxDealsPerMsg := uint64(2)

// Set max deals per publish deals message to 2
minerDef := []StorageMiner{{
Full: 0,
Opts: node.Override(
new(*storageadapter.DealPublisher),
storageadapter.NewDealPublisher(nil, &config.PublishMsgConfig{
PublishPeriod: config.Duration(publishPeriod),
MaxDealsPerMsg: maxDealsPerMsg,
})),
Preseal: PresealGenesis,
}}

// Create a connect client and miner node
n, sn := b(t, OneFull, minerDef)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
s := connectAndStartMining(t, b, blocktime, client, miner)
defer s.blockMiner.Stop()

// Starts a deal and waits until it's published
runDealTillPublish := func(rseed int) {
res, _, err := CreateClientFile(s.ctx, s.client, rseed)
require.NoError(t, err)

upds, err := client.ClientGetDealUpdates(s.ctx)
require.NoError(t, err)

startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)

// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)

done := make(chan struct{})
go func() {
for upd := range upds {
if upd.DataRef.Root == res.Root && upd.State == storagemarket.StorageDealAwaitingPreCommit {
done <- struct{}{}
}
}
}()
<-done
}

// Run three deals in parallel
done := make(chan struct{}, maxDealsPerMsg+1)
for rseed := 1; rseed <= 3; rseed++ {
rseed := rseed
go func() {
runDealTillPublish(rseed)
done <- struct{}{}
}()
}

// Wait for two of the deals to be published
for i := 0; i < int(maxDealsPerMsg); i++ {
<-done
}

// Expect a single PublishStorageDeals message that includes the first two deals
msgCids, err := s.client.StateListMessages(s.ctx, &api.MessageMatch{To: market.Address}, types.EmptyTSK, 1)
require.NoError(t, err)
count := 0
for _, msgCid := range msgCids {
msg, err := s.client.ChainGetMessage(s.ctx, msgCid)
require.NoError(t, err)

if msg.Method == market.Methods.PublishStorageDeals {
count++
var pubDealsParams market2.PublishStorageDealsParams
err = pubDealsParams.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
require.Len(t, pubDealsParams.Deals, int(maxDealsPerMsg))
}
}
require.Equal(t, 1, count)

// The third deal should be published once the publish period expires.
// Allow a little padding as it takes a moment for the state change to
// be noticed by the client.
padding := 10 * time.Second
select {
case <-time.After(publishPeriod + padding):
require.Fail(t, "Expected 3rd deal to be published once publish period elapsed")
case <-done: // Success
}
}

func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
s := setupOneClientOneMiner(t, b, blocktime)
defer s.blockMiner.Stop()
Expand Down
1 change: 1 addition & 0 deletions api/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const GenesisPreseals = 2
// Options for setting up a mock storage miner
type StorageMiner struct {
Full int
Opts node.Option
Preseal int
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/lotus-storage-miner/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, metadata string,
PieceCID: commD,
},
DealInfo: &sealing.DealInfo{
DealID: dealID,
DealID: dealID,
DealProposal: sector.Deal,
DealSchedule: sealing.DealSchedule{
StartEpoch: sector.Deal.StartEpoch,
EndEpoch: sector.Deal.EndEpoch,
Expand Down
2 changes: 1 addition & 1 deletion extern/storage-sealing/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
continue
}

proposal, err := api.StateMarketStorageDeal(ctx, p.DealInfo.DealID, tok)
proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, tok)
if err != nil {
return &ErrInvalidDeals{xerrors.Errorf("getting deal %d for piece %d: %w", p.DealInfo.DealID, i, err)}
}
Expand Down
209 changes: 209 additions & 0 deletions extern/storage-sealing/currentdealinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package sealing

import (
"bytes"
"context"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
)

type CurrentDealInfoAPI interface {
ChainGetMessage(context.Context, cid.Cid) (*types.Message, error)
StateLookupID(context.Context, address.Address, TipSetToken) (address.Address, error)
StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (*api.MarketDeal, error)
StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error)
}

type CurrentDealInfo struct {
DealID abi.DealID
MarketDeal *api.MarketDeal
PublishMsgTipSet TipSetToken
}

type CurrentDealInfoManager struct {
CDAPI CurrentDealInfoAPI
}

// GetCurrentDealInfo gets the current deal state and deal ID.
// Note that the deal ID is assigned when the deal is published, so it may
// have changed if there was a reorg after the deal was published.
func (mgr *CurrentDealInfoManager) GetCurrentDealInfo(ctx context.Context, tok TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (CurrentDealInfo, error) {
// Lookup the deal ID by comparing the deal proposal to the proposals in
// the publish deals message, and indexing into the message return value
dealID, pubMsgTok, err := mgr.dealIDFromPublishDealsMsg(ctx, tok, proposal, publishCid)
if err != nil {
return CurrentDealInfo{}, err
}

// Lookup the deal state by deal ID
marketDeal, err := mgr.CDAPI.StateMarketStorageDeal(ctx, dealID, tok)
if err == nil && proposal != nil {
// Make sure the retrieved deal proposal matches the target proposal
equal, err := mgr.CheckDealEquality(ctx, tok, *proposal, marketDeal.Proposal)
if err != nil {
return CurrentDealInfo{}, err
}
if !equal {
return CurrentDealInfo{}, xerrors.Errorf("Deal proposals for publish message %s did not match", publishCid)
}
}
return CurrentDealInfo{DealID: dealID, MarketDeal: marketDeal, PublishMsgTipSet: pubMsgTok}, err
}

// dealIDFromPublishDealsMsg looks up the publish deals message by cid, and finds the deal ID
// by looking at the message return value
func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context, tok TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (abi.DealID, TipSetToken, error) {
dealID := abi.DealID(0)

// Get the return value of the publish deals message
lookup, err := mgr.CDAPI.StateSearchMsg(ctx, publishCid)
if err != nil {
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: search msg failed: %w", publishCid, err)
}

if lookup.Receipt.ExitCode != exitcode.Ok {
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", publishCid, lookup.Receipt.ExitCode)
}

var retval market.PublishStorageDealsReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: unmarshalling message return: %w", publishCid, err)
}

// Previously, publish deals messages contained a single deal, and the
// deal proposal was not included in the sealing deal info.
// So check if the proposal is nil and check the number of deals published
// in the message.
if proposal == nil {
if len(retval.IDs) > 1 {
return dealID, nil, xerrors.Errorf(
"getting deal ID from publish deal message %s: "+
"no deal proposal supplied but message return value has more than one deal (%d deals)",
publishCid, len(retval.IDs))
}

// There is a single deal in this publish message and no deal proposal
// was supplied, so we have nothing to compare against. Just assume
// the deal ID is correct.
return retval.IDs[0], lookup.TipSetTok, nil
}

// Get the parameters to the publish deals message
pubmsg, err := mgr.CDAPI.ChainGetMessage(ctx, publishCid)
if err != nil {
return dealID, nil, xerrors.Errorf("getting publish deal message %s: %w", publishCid, err)
}

var pubDealsParams market2.PublishStorageDealsParams
if err := pubDealsParams.UnmarshalCBOR(bytes.NewReader(pubmsg.Params)); err != nil {
return dealID, nil, xerrors.Errorf("unmarshalling publish deal message params for message %s: %w", publishCid, err)
}

// Scan through the deal proposals in the message parameters to find the
// index of the target deal proposal
dealIdx := -1
for i, paramDeal := range pubDealsParams.Deals {
eq, err := mgr.CheckDealEquality(ctx, tok, *proposal, market.DealProposal(paramDeal.Proposal))
if err != nil {
return dealID, nil, xerrors.Errorf("comparing publish deal message %s proposal to deal proposal: %w", publishCid, err)
}
if eq {
dealIdx = i
break
}
}

if dealIdx == -1 {
return dealID, nil, xerrors.Errorf("could not find deal in publish deals message %s", publishCid)
}

if dealIdx >= len(retval.IDs) {
return dealID, nil, xerrors.Errorf(
"deal index %d out of bounds of deals (len %d) in publish deals message %s",
dealIdx, len(retval.IDs), publishCid)
}

return retval.IDs[dealIdx], lookup.TipSetTok, nil
}

func (mgr *CurrentDealInfoManager) CheckDealEquality(ctx context.Context, tok TipSetToken, p1, p2 market.DealProposal) (bool, error) {
p1ClientID, err := mgr.CDAPI.StateLookupID(ctx, p1.Client, tok)
if err != nil {
return false, err
}
p2ClientID, err := mgr.CDAPI.StateLookupID(ctx, p2.Client, tok)
if err != nil {
return false, err
}
return p1.PieceCID.Equals(p2.PieceCID) &&
p1.PieceSize == p2.PieceSize &&
p1.VerifiedDeal == p2.VerifiedDeal &&
p1.Label == p2.Label &&
p1.StartEpoch == p2.StartEpoch &&
p1.EndEpoch == p2.EndEpoch &&
p1.StoragePricePerEpoch.Equals(p2.StoragePricePerEpoch) &&
p1.ProviderCollateral.Equals(p2.ProviderCollateral) &&
p1.ClientCollateral.Equals(p2.ClientCollateral) &&
p1.Provider == p2.Provider &&
p1ClientID == p2ClientID, nil
}

type CurrentDealInfoTskAPI interface {
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error)
}

type CurrentDealInfoAPIAdapter struct {
CurrentDealInfoTskAPI
}

func (c *CurrentDealInfoAPIAdapter) StateLookupID(ctx context.Context, a address.Address, tok TipSetToken) (address.Address, error) {
tsk, err := types.TipSetKeyFromBytes(tok)
if err != nil {
return address.Undef, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err)
}

return c.CurrentDealInfoTskAPI.StateLookupID(ctx, a, tsk)
}

func (c *CurrentDealInfoAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tok TipSetToken) (*api.MarketDeal, error) {
tsk, err := types.TipSetKeyFromBytes(tok)
if err != nil {
return nil, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err)
}

return c.CurrentDealInfoTskAPI.StateMarketStorageDeal(ctx, dealID, tsk)
}

func (c *CurrentDealInfoAPIAdapter) StateSearchMsg(ctx context.Context, k cid.Cid) (*MsgLookup, error) {
wmsg, err := c.CurrentDealInfoTskAPI.StateSearchMsg(ctx, k)
if err != nil {
return nil, err
}

if wmsg == nil {
return nil, nil
}

return &MsgLookup{
Receipt: MessageReceipt{
ExitCode: wmsg.Receipt.ExitCode,
Return: wmsg.Receipt.Return,
GasUsed: wmsg.Receipt.GasUsed,
},
TipSetTok: wmsg.TipSet.Bytes(),
Height: wmsg.Height,
}, nil
}

var _ CurrentDealInfoAPI = (*CurrentDealInfoAPIAdapter)(nil)
Loading

0 comments on commit 1dbef15

Please sign in to comment.