Skip to content

Commit

Permalink
feat: markets - separate watching for pre-commit from prove-commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Nov 24, 2020
1 parent bb25eb9 commit fe3db0b
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 327 deletions.
1 change: 1 addition & 0 deletions api/api_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type GatewayAPI interface {
StateMinerProvingDeadline(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*dline.Info, error)
StateMinerPower(context.Context, address.Address, types.TipSetKey) (*MinerPower, error)
StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error)
StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*MsgLookup, error)
}
5 changes: 5 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ type GatewayStruct struct {
StateMarketBalance func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MarketBalance, error)
StateMarketStorageDeal func(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error)
StateNetworkVersion func(ctx context.Context, tsk types.TipSetKey) (stnetwork.Version, error)
StateSectorGetInfo func(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateVerifiedClientStatus func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error)
StateWaitMsg func(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
}
Expand Down Expand Up @@ -1696,6 +1697,10 @@ func (g GatewayStruct) StateNetworkVersion(ctx context.Context, tsk types.TipSet
return g.Internal.StateNetworkVersion(ctx, tsk)
}

func (g GatewayStruct) StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) {
return g.Internal.StateSectorGetInfo(ctx, maddr, n, tsk)
}

func (g GatewayStruct) StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error) {
return g.Internal.StateVerifiedClientStatus(ctx, addr, tsk)
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/lotus-gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type gatewayDepsAPI interface {
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error)
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
StateCirculatingSupply(context.Context, types.TipSetKey) (abi.TokenAmount, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error)
StateVMCirculatingSupplyInternal(context.Context, types.TipSetKey) (api.CirculatingSupply, error)
}
Expand Down Expand Up @@ -362,7 +363,13 @@ func (a *GatewayAPI) StateCirculatingSupply(ctx context.Context, tsk types.TipSe
return types.BigInt{}, err
}
return a.api.StateCirculatingSupply(ctx, tsk)
}

func (a *GatewayAPI) StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) {
if err := a.checkTipsetKey(ctx, tsk); err != nil {
return nil, err
}
return a.api.StateSectorGetInfo(ctx, maddr, n, tsk)
}

func (a *GatewayAPI) StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.2.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-markets v1.0.6
github.com/filecoin-project/go-fil-markets v1.0.6-0.20201124103659-6907f90aaadc
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20201008195726-68c6a2704e49
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand Down
207 changes: 2 additions & 205 deletions go.sum

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions markets/storageadapter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,12 @@ func (c *ClientNodeAdapter) DealProviderCollateralBounds(ctx context.Context, si
return big.Mul(bounds.Min, big.NewInt(clientOverestimation)), bounds.Max, nil
}

func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
return OnDealSectorCommitted(ctx, c, c.ev, provider, dealID, marketactor.DealProposal(proposal), publishCid, cb)
func (c *ClientNodeAdapter) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
return OnDealSectorPreCommitted(ctx, c, c.ev, provider, dealID, marketactor.DealProposal(proposal), publishCid, cb)
}

func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
return OnDealSectorCommitted(ctx, c, c.ev, provider, dealID, sectorNumber, marketactor.DealProposal(proposal), publishCid, cb)
}

func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
Expand Down
217 changes: 160 additions & 57 deletions markets/storageadapter/ondealsectorcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,110 +20,201 @@ type sectorCommittedEventsAPI interface {
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
}

func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
type sectorPreCommitAPI interface {
getCurrentDealInfoAPI
StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
}

func OnDealSectorPreCommitted(ctx context.Context, api sectorPreCommitAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
newDealID, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
if err != nil {
// TODO: This may be fine for some errors
return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
// Note: the error returned from here will end up being returned
// from OnDealSectorPreCommitted so no need to call the callback
// with the error
return false, false, err
}
dealID = newDealID

if sd.State.SectorStartEpoch > 0 {
cb(nil)
if isActive {
// Deal is already active, bail out
cb(0, true, nil)
return true, false, nil
}

// Not yet active, start matching against incoming messages
return false, true, nil
}

var sectorNumber abi.SectorNumber
var sectorFound bool
// Watch for a pre-commit or prove-commit message to the provider.
// It's possible (when the node restarts) that the pre-commit has already
// been submitted, in which case we wait for the prove-commit message,
// so match both pre-commit and prove-commit messages.
matchEvent := func(msg *types.Message) (matched bool, err error) {
if msg.To != provider {
return false, nil
}

switch msg.Method {
case miner.Methods.PreCommitSector:
return true, nil
case miner.Methods.ProveCommitSector:
return true, nil
}

return false, nil
}

// Check if the message params included the deal ID we're looking for.
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) {
defer func() {
if err != nil {
cb(xerrors.Errorf("handling applied event: %w", err))
cb(0, false, xerrors.Errorf("handling applied event: %w", err))
}
}()
switch msg.Method {
case miner.Methods.PreCommitSector:

// Check if waiting for pre-commit timed out
if msg == nil {
return false, xerrors.Errorf("timed out waiting for deal %d pre-commit", dealID)
}

var sectorNumber abi.SectorNumber
var msgDealIDs []abi.DealID
if msg.Method == miner.Methods.PreCommitSector {
// If it's a pre-commit message, the deal IDs are in the message parameters
var params miner.SectorPreCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
}

dealID, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil {
return false, err
}

for _, did := range params.DealIDs {
if did == dealID {
sectorNumber = params.SectorNumber
sectorFound = true
return true, nil
}
}
return true, nil
case miner.Methods.ProveCommitSector:
if msg == nil {
log.Error("timed out waiting for deal activation... what now?")
return false, nil
msgDealIDs = params.DealIDs
sectorNumber = params.SectorNumber
} else {
// If it's a prove-commit message, the parameters don't have deal IDs,
// just a sector number
var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
}

_, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
// Look up the sector number in miner state to get the deal IDs
sectorNumber = params.SectorNumber
sectorInfo, err := api.StateSectorGetInfo(ctx, provider, sectorNumber, ts.Key())
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
return false, xerrors.Errorf("failed to get sector info for sector %d: %w", sectorNumber, err)
}

if sd.State.SectorStartEpoch < 1 {
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealID, ts.ParentState(), ts.Height())
// If there is no sector info for this sector, ignore it
if sectorInfo == nil {
return true, nil
}

log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch)
msgDealIDs = sectorInfo.DealIDs
}

cb(nil)
// When the deal is published, the deal ID may change, so get the
// current deal ID from the publish message CID
dealID, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil {
return false, err
}

return false, nil
default:
return false, nil
// Check through the deal IDs associated with this message
for _, did := range msgDealIDs {
if did == dealID {
// Found the deal ID in this message. Callback with the sector ID.
// If the message is a prove-commit, then the sector is already active.
isActive := msg.Method == miner.Methods.ProveCommitSector
cb(sectorNumber, isActive, nil)
return false, nil
}
}

// Didn't find the deal ID in this message, so keep looking
return true, nil
}

revert := func(ctx context.Context, ts *types.TipSet) error {
log.Warn("deal activation reverted; TODO: actually handle this!")
log.Warn("deal pre-commit reverted; TODO: actually handle this!")
// TODO: Just go back to DealSealing?
return nil
}

if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err)
}

return nil
}

func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
if err != nil {
// Note: the error returned from here will end up being returned
// from OnDealSectorCommitted so no need to call the callback
// with the error
return false, false, err
}

if isActive {
// Deal is already active, bail out
cb(nil)
return true, false, nil
}

// Not yet active, start matching against incoming messages
return false, true, nil
}

// Match a prove-commit sent to the provider with the given sector number
matchEvent := func(msg *types.Message) (matched bool, err error) {
if msg.To != provider {
if msg.To != provider || msg.Method != miner.Methods.ProveCommitSector {
return false, nil
}

switch msg.Method {
case miner.Methods.PreCommitSector:
return !sectorFound, nil
case miner.Methods.ProveCommitSector:
if !sectorFound {
return false, nil
}
var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
}

var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
}
return params.SectorNumber == sectorNumber, nil
}

if params.SectorNumber != sectorNumber {
return false, nil
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) {
defer func() {
if err != nil {
cb(xerrors.Errorf("handling applied event: %w", err))
}
}()

return true, nil
default:
return false, nil
// Check if waiting for prove-commit timed out
if msg == nil {
return false, xerrors.Errorf("timed out waiting for deal activation for deal %d", dealID)
}

// Get the deal info
_, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}

// Make sure the deal is active
if sd.State.SectorStartEpoch < 1 {
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealID, ts.ParentState(), ts.Height())
}

log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch)

cb(nil)

return false, nil
}

revert := func(ctx context.Context, ts *types.TipSet) error {
log.Warn("deal activation reverted; TODO: actually handle this!")
// TODO: Just go back to DealSealing?
return nil
}

if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil {
Expand All @@ -132,3 +223,15 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event

return nil
}

func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) {
_, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil {
// TODO: This may be fine for some errors
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}

// Sector with deal is already active
isActive := sd.State.SectorStartEpoch > 0
return isActive, nil
}
Loading

0 comments on commit fe3db0b

Please sign in to comment.