Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storagefsm: Trigger input processing when below limits #5801

Merged
merged 7 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions api/test/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/modules/dtypes"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
Expand Down Expand Up @@ -183,6 +185,71 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio
}
}

func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
publishPeriod := 10 * time.Second
maxDealsPerMsg := uint64(4)

// Set max deals per publish deals message to maxDealsPerMsg
minerDef := []StorageMiner{{
Full: 0,
Opts: node.Options(
node.Override(
new(*storageadapter.DealPublisher),
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
Period: publishPeriod,
MaxDealsPerMsg: maxDealsPerMsg,
})),
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 1,
MaxSealingSectors: 1,
MaxSealingSectorsForDeals: 2,
AlwaysKeepUnsealedCopy: true,
}, nil
}, nil
}),
),
Preseal: PresealGenesis,
}}

// Create a connect client and miner node
n, sn := b(t, OneFull, minerDef)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
s := connectAndStartMining(t, b, blocktime, client, miner)
defer s.blockMiner.Stop()

// Starts a deal and waits until it's published
runDealTillSeal := func(rseed int) {
res, _, err := CreateClientFile(s.ctx, s.client, rseed)
require.NoError(t, err)

dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
waitDealSealed(t, s.ctx, s.miner, s.client, dc, false)
}

// Run maxDealsPerMsg+1 deals in parallel
done := make(chan struct{}, maxDealsPerMsg+1)
for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ {
rseed := rseed
go func() {
runDealTillSeal(rseed)
done <- struct{}{}
}()
}

// Wait for maxDealsPerMsg of the deals to be published
for i := 0; i < int(maxDealsPerMsg); i++ {
<-done
}

sl, err := sn[0].SectorsList(s.ctx)
require.NoError(t, err)
require.GreaterOrEqual(t, len(sl), 4)
require.LessOrEqual(t, len(sl), 5)
}

func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
s := setupOneClientOneMiner(t, b, blocktime)
defer s.blockMiner.Stop()
Expand Down
35 changes: 34 additions & 1 deletion extern/storage-sealing/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta

*/

m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State)
if err := m.onUpdateSector(context.TODO(), state); err != nil {
log.Errorw("update sector stats", "error", err)
}

switch state.State {
// Happy path
Expand Down Expand Up @@ -391,6 +393,37 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return nil, processed, nil
}

func (m *Sealing) onUpdateSector(ctx context.Context, state *SectorInfo) error {
if m.getConfig == nil {
return nil // tests
}

cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
}
sp, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting seal proof type: %w", err)
}

shouldUpdateInput := m.stats.updateSector(cfg, m.minerSectorID(state.SectorNumber), state.State)

// trigger more input processing when we've dipped below max sealing limits
if shouldUpdateInput {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing this to shouldUpdateInput && false makes TestBatchDealInput fail reliably

go func() {
m.inputLk.Lock()
defer m.inputLk.Unlock()

if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err)
}
}()
}

return nil
}

func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) {
for i, event := range events {
switch e := event.User.(type) {
Expand Down
11 changes: 3 additions & 8 deletions extern/storage-sealing/garbage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,13 @@ func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
return storage.SectorRef{}, xerrors.Errorf("getting seal proof type: %w", err)
}

sid, err := m.sc.Next()
sid, err := m.createSector(ctx, cfg, spt)
if err != nil {
return storage.SectorRef{}, xerrors.Errorf("generating sector number: %w", err)
}
sectorID := m.minerSector(spt, sid)
err = m.sealer.NewSector(ctx, sectorID)
if err != nil {
return storage.SectorRef{}, xerrors.Errorf("notifying sealer of the new sector: %w", err)
return storage.SectorRef{}, err
}

log.Infof("Creating CC sector %d", sid)
return sectorID, m.sectors.Send(uint64(sid), SectorStartCC{
return m.minerSector(spt, sid), m.sectors.Send(uint64(sid), SectorStartCC{
ID: sid,
SectorType: spt,
})
Expand Down
28 changes: 21 additions & 7 deletions extern/storage-sealing/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
)

func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error {
Expand Down Expand Up @@ -388,23 +389,36 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return nil
}

sid, err := m.createSector(ctx, cfg, sp)
if err != nil {
return err
}

log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
return m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: sp,
})
}

// call with m.inputLk
func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) {
// Now actually create a new sector

sid, err := m.sc.Next()
if err != nil {
return xerrors.Errorf("getting sector number: %w", err)
return 0, xerrors.Errorf("getting sector number: %w", err)
}

err = m.sealer.NewSector(ctx, m.minerSector(sp, sid))
if err != nil {
return xerrors.Errorf("initializing sector: %w", err)
return 0, xerrors.Errorf("initializing sector: %w", err)
}

log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
return m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: sp,
})
// update stats early, fsm planner would do that async
m.stats.updateSector(cfg, m.minerSectorID(sid), UndefinedSectorState)
Comment on lines +418 to +419
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this we could end up creating a bunch of sectors at once, and start more parallel work than specified by the max sealing sector config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit brittle. Can we have a m.createSector function that calls NewSector and updates the stats?


return sid, nil
}

func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
Expand Down
39 changes: 36 additions & 3 deletions extern/storage-sealing/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
)

type statSectorState int
Expand All @@ -23,10 +24,14 @@ type SectorStats struct {
totals [nsst]uint64
}

func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) {
func (ss *SectorStats) updateSector(cfg sealiface.Config, id abi.SectorID, st SectorState) (updateInput bool) {
ss.lk.Lock()
defer ss.lk.Unlock()

preSealing := ss.curSealingLocked()
preStaging := ss.curStagingLocked()

// update totals
oldst, found := ss.bySector[id]
if found {
ss.totals[oldst]--
Expand All @@ -35,20 +40,48 @@ func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) {
sst := toStatState(st)
ss.bySector[id] = sst
ss.totals[sst]++

// check if we may need be able to process more deals
sealing := ss.curSealingLocked()
staging := ss.curStagingLocked()

log.Debugw("sector stats", "sealing", sealing, "staging", staging)

if cfg.MaxSealingSectorsForDeals > 0 && // max sealing deal sector limit set
preSealing >= cfg.MaxSealingSectorsForDeals && // we were over limit
sealing < cfg.MaxSealingSectorsForDeals { // and we're below the limit now
updateInput = true
}

if cfg.MaxWaitDealsSectors > 0 && // max waiting deal sector limit set
preStaging >= cfg.MaxWaitDealsSectors && // we were over limit
staging < cfg.MaxWaitDealsSectors { // and we're below the limit now
updateInput = true
}

return updateInput
}

func (ss *SectorStats) curSealingLocked() uint64 {
return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed]
}

func (ss *SectorStats) curStagingLocked() uint64 {
return ss.totals[sstStaging]
}

// return the number of sectors currently in the sealing pipeline
func (ss *SectorStats) curSealing() uint64 {
ss.lk.Lock()
defer ss.lk.Unlock()

return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed]
return ss.curSealingLocked()
}

// return the number of sectors waiting to enter the sealing pipeline
func (ss *SectorStats) curStaging() uint64 {
ss.lk.Lock()
defer ss.lk.Unlock()

return ss.totals[sstStaging]
return ss.curStagingLocked()
}
3 changes: 3 additions & 0 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func TestAPIDealFlow(t *testing.T) {
t.Run("TestPublishDealsBatching", func(t *testing.T) {
test.TestPublishDealsBatching(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
})
t.Run("TestBatchDealInput", func(t *testing.T) {
test.TestBatchDealInput(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
})
}

func TestAPIDealFlowReal(t *testing.T) {
Expand Down