Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: split window PoST submission into multiple messages #3689

Merged
merged 2 commits into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 168 additions & 103 deletions storage/wdpost_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package storage
import (
"bytes"
"context"
"errors"
"time"

"github.com/filecoin-project/go-state-types/dline"
Expand All @@ -30,8 +29,6 @@ import (
"github.com/filecoin-project/lotus/journal"
)

var errNoPartitions = errors.New("no partitions")

func (s *WindowPoStScheduler) failPost(err error, deadline *dline.Info) {
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return WdPoStSchedulerEvt{
Expand Down Expand Up @@ -79,23 +76,27 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *dline.Info,
})
}

proof, err := s.runPost(ctx, *deadline, ts)
switch err {
case errNoPartitions:
posts, err := s.runPost(ctx, *deadline, ts)
if err != nil {
log.Errorf("runPost failed: %+v", err)
s.failPost(err, deadline)
return
}

if len(posts) == 0 {
recordProofsEvent(nil, cid.Undef)
return
case nil:
sm, err := s.submitPost(ctx, proof)
}

for i := range posts {
post := &posts[i]
sm, err := s.submitPost(ctx, post)
if err != nil {
log.Errorf("submitPost failed: %+v", err)
s.failPost(err, deadline)
return
} else {
recordProofsEvent(post.Partitions, sm.Cid())
}
recordProofsEvent(proof.Partitions, sm.Cid())
default:
log.Errorf("runPost failed: %+v", err)
s.failPost(err, deadline)
return
}

journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
Expand Down Expand Up @@ -327,7 +328,7 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
return faults, sm, nil
}

func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) (*miner.SubmitWindowedPoStParams, error) {
func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) ([]miner.SubmitWindowedPoStParams, error) {
ctx, span := trace.StartSpan(ctx, "storage.runPost")
defer span.End()

Expand Down Expand Up @@ -399,136 +400,200 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err)
}

// Get the partitions for the given deadline
partitions, err := s.api.StateMinerPartitions(ctx, s.actor, di.Index, ts.Key())
if err != nil {
return nil, xerrors.Errorf("getting partitions: %w", err)
}

params := &miner.SubmitWindowedPoStParams{
Deadline: di.Index,
Partitions: make([]miner.PoStPartition, 0, len(partitions)),
Proofs: nil,
// Split partitions into batches, so as not to exceed the number of sectors
// allowed in a single message
partitionBatches, err := s.batchPartitions(partitions)
if err != nil {
return nil, err
}

skipCount := uint64(0)
postSkipped := bitfield.New()
var postOut []proof.PoStProof
// Generate proofs in batches
posts := make([]miner.SubmitWindowedPoStParams, 0, len(partitionBatches))
for batchIdx, batch := range partitionBatches {
batchPartitionStartIdx := 0
for _, batch := range partitionBatches[:batchIdx] {
batchPartitionStartIdx += len(batch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just keep a running count?

}

for retries := 0; retries < 5; retries++ {
var sinfos []proof.SectorInfo
sidToPart := map[abi.SectorNumber]int{}
params := miner.SubmitWindowedPoStParams{
Deadline: di.Index,
Partitions: make([]miner.PoStPartition, 0, len(batch)),
Proofs: nil,
}

for partIdx, partition := range partitions {
// TODO: Can do this in parallel
toProve, err := partition.ActiveSectors()
if err != nil {
return nil, xerrors.Errorf("getting active sectors: %w", err)
}
skipCount := uint64(0)
postSkipped := bitfield.New()
var postOut []proof.PoStProof
somethingToProve := true
for retries := 0; retries < 5; retries++ {
var sinfos []proof.SectorInfo
for partIdx, partition := range batch {
// TODO: Can do this in parallel
toProve, err := partition.ActiveSectors()
if err != nil {
return nil, xerrors.Errorf("getting active sectors: %w", err)
}

toProve, err = bitfield.MergeBitFields(toProve, partition.Recoveries)
if err != nil {
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
}
toProve, err = bitfield.MergeBitFields(toProve, partition.Recoveries)
if err != nil {
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
}

good, err := s.checkSectors(ctx, toProve)
if err != nil {
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
}
good, err := s.checkSectors(ctx, toProve)
if err != nil {
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
}

good, err = bitfield.SubtractBitField(good, postSkipped)
if err != nil {
return nil, xerrors.Errorf("toProve - postSkipped: %w", err)
}
good, err = bitfield.SubtractBitField(good, postSkipped)
if err != nil {
return nil, xerrors.Errorf("toProve - postSkipped: %w", err)
}

skipped, err := bitfield.SubtractBitField(toProve, good)
if err != nil {
return nil, xerrors.Errorf("toProve - good: %w", err)
}
skipped, err := bitfield.SubtractBitField(toProve, good)
if err != nil {
return nil, xerrors.Errorf("toProve - good: %w", err)
}

sc, err := skipped.Count()
if err != nil {
return nil, xerrors.Errorf("getting skipped sector count: %w", err)
}
sc, err := skipped.Count()
if err != nil {
return nil, xerrors.Errorf("getting skipped sector count: %w", err)
}

skipCount += sc
skipCount += sc

ssi, err := s.sectorsForProof(ctx, good, partition.Sectors, ts)
if err != nil {
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}
ssi, err := s.sectorsForProof(ctx, good, partition.Sectors, ts)
if err != nil {
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}

if len(ssi) == 0 {
continue
}

if len(ssi) == 0 {
continue
sinfos = append(sinfos, ssi...)
params.Partitions = append(params.Partitions, miner.PoStPartition{
Index: uint64(batchPartitionStartIdx + partIdx),
Skipped: skipped,
})
}

sinfos = append(sinfos, ssi...)
for _, si := range ssi {
sidToPart[si.SectorNumber] = partIdx
if len(sinfos) == 0 {
// nothing to prove for this batch
somethingToProve = false
break
}

params.Partitions = append(params.Partitions, miner.PoStPartition{
Index: uint64(partIdx),
Skipped: skipped,
})
}
// Generate proof
log.Infow("running windowPost",
"chain-random", rand,
"deadline", di,
"height", ts.Height(),
"skipped", skipCount)

if len(sinfos) == 0 {
// nothing to prove..
return nil, errNoPartitions
}
tsStart := build.Clock.Now()

log.Infow("running windowPost",
"chain-random", rand,
"deadline", di,
"height", ts.Height(),
"skipped", skipCount)
mid, err := address.IDFromAddress(s.actor)
if err != nil {
return nil, err
}

tsStart := build.Clock.Now()
var ps []abi.SectorID
postOut, ps, err = s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, abi.PoStRandomness(rand))
elapsed := time.Since(tsStart)

mid, err := address.IDFromAddress(s.actor)
if err != nil {
return nil, err
}
log.Infow("computing window PoSt", "batch", batchIdx, "elapsed", elapsed)

var ps []abi.SectorID
postOut, ps, err = s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, abi.PoStRandomness(rand))
elapsed := time.Since(tsStart)
if err == nil {
// Proof generation successful, stop retrying
break
}

log.Infow("computing window PoSt", "elapsed", elapsed)
// Proof generation failed, so retry

if err == nil {
break
}
if len(ps) == 0 {
return nil, xerrors.Errorf("running post failed: %w", err)
}

log.Warnw("generate window PoSt skipped sectors", "sectors", ps, "error", err, "try", retries)

if len(ps) == 0 {
return nil, xerrors.Errorf("running post failed: %w", err)
skipCount += uint64(len(ps))
for _, sector := range ps {
postSkipped.Set(uint64(sector.Number))
}
}

log.Warnw("generate window PoSt skipped sectors", "sectors", ps, "error", err, "try", retries)
// Nothing to prove for this batch, try the next batch
if !somethingToProve {
continue
}

skipCount += uint64(len(ps))
for _, sector := range ps {
postSkipped.Set(uint64(sector.Number))
if len(postOut) == 0 {
return nil, xerrors.Errorf("received no proofs back from generate window post")
}
}

if len(postOut) == 0 {
return nil, xerrors.Errorf("received no proofs back from generate window post")
}
params.Proofs = postOut

params.Proofs = postOut
posts = append(posts, params)
}

// Compute randomness after generating proofs so as to reduce the impact
// of chain reorgs (which change randomness)
commEpoch := di.Open
commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil)
if err != nil {
return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err)
return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), commEpoch, err)
}

for i := range posts {
posts[i].ChainCommitEpoch = commEpoch
posts[i].ChainCommitRand = commRand
}
params.ChainCommitEpoch = commEpoch
params.ChainCommitRand = commRand

log.Infow("submitting window PoSt")
return posts, nil
}

return params, nil
func (s *WindowPoStScheduler) batchPartitions(partitions []*miner.Partition) ([][]*miner.Partition, error) {
// Get the number of sectors allowed in a partition, for this proof size
sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(s.proofType)
if err != nil {
return nil, xerrors.Errorf("getting sectors per partition: %w", err)
}

// We don't want to exceed the number of sectors allowed in a message.
// So given the number of sectors in a partition, work out the number of
// partitions that can be in a message without exceeding sectors per
// message:
// floor(number of sectors allowed in a message / sectors per partition)
// eg:
// max sectors per message 7: ooooooo
// sectors per partition 3: ooo
// partitions per message 2: oooOOO
// <1><2> (3rd doesn't fit)
partitionsPerMsg := int(miner.AddressedSectorsMax / sectorsPerPartition)

// The number of messages will be:
// ceiling(number of partitions / partitions per message)
batchCount := len(partitions) / partitionsPerMsg
if len(partitions)%partitionsPerMsg != 0 {
batchCount++
}

// Split the partitions into batches
batches := make([][]*miner.Partition, 0, batchCount)
for i := 0; i < len(partitions); i += partitionsPerMsg {
end := i + partitionsPerMsg
if end > len(partitions) {
end = len(partitions)
}
batches = append(batches, partitions[i:end])
}
return batches, nil
}

func (s *WindowPoStScheduler) sectorsForProof(ctx context.Context, goodSectors, allSectors bitfield.BitField, ts *types.TipSet) ([]proof.SectorInfo, error) {
Expand Down
Loading