diff --git a/api/test/deals.go b/api/test/deals.go index f3d22d26c32..7a9454bae38 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -23,9 +23,11 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/types" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/markets/storageadapter" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/impl" + "github.com/filecoin-project/lotus/node/modules/dtypes" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" ipld "github.com/ipfs/go-ipld-format" dag "github.com/ipfs/go-merkledag" @@ -183,6 +185,71 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio } } +func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { + publishPeriod := 10 * time.Second + maxDealsPerMsg := uint64(4) + + // Set max deals per publish deals message to maxDealsPerMsg + minerDef := []StorageMiner{{ + Full: 0, + Opts: node.Options( + node.Override( + new(*storageadapter.DealPublisher), + storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{ + Period: publishPeriod, + MaxDealsPerMsg: maxDealsPerMsg, + })), + node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { + return func() (sealiface.Config, error) { + return sealiface.Config{ + MaxWaitDealsSectors: 1, + MaxSealingSectors: 1, + MaxSealingSectorsForDeals: 2, + AlwaysKeepUnsealedCopy: true, + }, nil + }, nil + }), + ), + Preseal: PresealGenesis, + }} + + // Create a connect client and miner node + n, sn := b(t, OneFull, minerDef) + client := n[0].FullNode.(*impl.FullNodeAPI) + miner := sn[0] + s := connectAndStartMining(t, b, blocktime, client, miner) + defer s.blockMiner.Stop() + + // Starts a deal and waits until it's published + runDealTillSeal := func(rseed int) { + res, _, err := CreateClientFile(s.ctx, s.client, rseed) + require.NoError(t, err) + + dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch) + waitDealSealed(t, s.ctx, s.miner, s.client, dc, false) + } + + // Run maxDealsPerMsg+1 deals in parallel + done := make(chan struct{}, maxDealsPerMsg+1) + for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ { + rseed := rseed + go func() { + runDealTillSeal(rseed) + done <- struct{}{} + }() + } + + // Wait for maxDealsPerMsg of the deals to be published + for i := 0; i < int(maxDealsPerMsg); i++ { + <-done + } + + sl, err := sn[0].SectorsList(s.ctx) + require.NoError(t, err) + require.GreaterOrEqual(t, len(sl), 4) + require.LessOrEqual(t, len(sl), 5) +} + func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { s := setupOneClientOneMiner(t, b, blocktime) defer s.blockMiner.Stop() diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index fd6b3200177..7b60efa68b7 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -300,7 +300,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta */ - m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State) + if err := m.onUpdateSector(context.TODO(), state); err != nil { + log.Errorw("update sector stats", "error", err) + } switch state.State { // Happy path @@ -391,6 +393,37 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta return nil, processed, nil } +func (m *Sealing) onUpdateSector(ctx context.Context, state *SectorInfo) error { + if m.getConfig == nil { + return nil // tests + } + + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting config: %w", err) + } + sp, err := m.currentSealProof(ctx) + if err != nil { + return xerrors.Errorf("getting seal proof type: %w", err) + } + + shouldUpdateInput := m.stats.updateSector(cfg, m.minerSectorID(state.SectorNumber), state.State) + + // trigger more input processing when we've dipped below max sealing limits + if shouldUpdateInput { + go func() { + m.inputLk.Lock() + defer m.inputLk.Unlock() + + if err := m.updateInput(ctx, sp); err != nil { + log.Errorf("%+v", err) + } + }() + } + + return nil +} + func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) { for i, event := range events { switch e := event.User.(type) { diff --git a/extern/storage-sealing/garbage.go b/extern/storage-sealing/garbage.go index 398040e6ed0..c8ec21a84d8 100644 --- a/extern/storage-sealing/garbage.go +++ b/extern/storage-sealing/garbage.go @@ -28,18 +28,13 @@ func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) { return storage.SectorRef{}, xerrors.Errorf("getting seal proof type: %w", err) } - sid, err := m.sc.Next() + sid, err := m.createSector(ctx, cfg, spt) if err != nil { - return storage.SectorRef{}, xerrors.Errorf("generating sector number: %w", err) - } - sectorID := m.minerSector(spt, sid) - err = m.sealer.NewSector(ctx, sectorID) - if err != nil { - return storage.SectorRef{}, xerrors.Errorf("notifying sealer of the new sector: %w", err) + return storage.SectorRef{}, err } log.Infof("Creating CC sector %d", sid) - return sectorID, m.sectors.Send(uint64(sid), SectorStartCC{ + return m.minerSector(spt, sid), m.sectors.Send(uint64(sid), SectorStartCC{ ID: sid, SectorType: spt, }) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 564afbc8814..44d2e8275b4 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -16,6 +16,7 @@ import ( sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { @@ -388,23 +389,36 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal return nil } + sid, err := m.createSector(ctx, cfg, sp) + if err != nil { + return err + } + + log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) + return m.sectors.Send(uint64(sid), SectorStart{ + ID: sid, + SectorType: sp, + }) +} + +// call with m.inputLk +func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) { // Now actually create a new sector sid, err := m.sc.Next() if err != nil { - return xerrors.Errorf("getting sector number: %w", err) + return 0, xerrors.Errorf("getting sector number: %w", err) } err = m.sealer.NewSector(ctx, m.minerSector(sp, sid)) if err != nil { - return xerrors.Errorf("initializing sector: %w", err) + return 0, xerrors.Errorf("initializing sector: %w", err) } - log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) - return m.sectors.Send(uint64(sid), SectorStart{ - ID: sid, - SectorType: sp, - }) + // update stats early, fsm planner would do that async + m.stats.updateSector(cfg, m.minerSectorID(sid), UndefinedSectorState) + + return sid, nil } func (m *Sealing) StartPacking(sid abi.SectorNumber) error { diff --git a/extern/storage-sealing/stats.go b/extern/storage-sealing/stats.go index 10852937572..2688d849405 100644 --- a/extern/storage-sealing/stats.go +++ b/extern/storage-sealing/stats.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) type statSectorState int @@ -23,10 +24,14 @@ type SectorStats struct { totals [nsst]uint64 } -func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) { +func (ss *SectorStats) updateSector(cfg sealiface.Config, id abi.SectorID, st SectorState) (updateInput bool) { ss.lk.Lock() defer ss.lk.Unlock() + preSealing := ss.curSealingLocked() + preStaging := ss.curStagingLocked() + + // update totals oldst, found := ss.bySector[id] if found { ss.totals[oldst]-- @@ -35,6 +40,34 @@ func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) { sst := toStatState(st) ss.bySector[id] = sst ss.totals[sst]++ + + // check if we may need be able to process more deals + sealing := ss.curSealingLocked() + staging := ss.curStagingLocked() + + log.Debugw("sector stats", "sealing", sealing, "staging", staging) + + if cfg.MaxSealingSectorsForDeals > 0 && // max sealing deal sector limit set + preSealing >= cfg.MaxSealingSectorsForDeals && // we were over limit + sealing < cfg.MaxSealingSectorsForDeals { // and we're below the limit now + updateInput = true + } + + if cfg.MaxWaitDealsSectors > 0 && // max waiting deal sector limit set + preStaging >= cfg.MaxWaitDealsSectors && // we were over limit + staging < cfg.MaxWaitDealsSectors { // and we're below the limit now + updateInput = true + } + + return updateInput +} + +func (ss *SectorStats) curSealingLocked() uint64 { + return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed] +} + +func (ss *SectorStats) curStagingLocked() uint64 { + return ss.totals[sstStaging] } // return the number of sectors currently in the sealing pipeline @@ -42,7 +75,7 @@ func (ss *SectorStats) curSealing() uint64 { ss.lk.Lock() defer ss.lk.Unlock() - return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed] + return ss.curSealingLocked() } // return the number of sectors waiting to enter the sealing pipeline @@ -50,5 +83,5 @@ func (ss *SectorStats) curStaging() uint64 { ss.lk.Lock() defer ss.lk.Unlock() - return ss.totals[sstStaging] + return ss.curStagingLocked() } diff --git a/node/node_test.go b/node/node_test.go index fb1f1e810b8..a246ff65bbc 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -58,6 +58,9 @@ func TestAPIDealFlow(t *testing.T) { t.Run("TestPublishDealsBatching", func(t *testing.T) { test.TestPublishDealsBatching(t, builder.MockSbBuilder, blockTime, dealStartEpoch) }) + t.Run("TestBatchDealInput", func(t *testing.T) { + test.TestBatchDealInput(t, builder.MockSbBuilder, blockTime, dealStartEpoch) + }) } func TestAPIDealFlowReal(t *testing.T) {