Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: Backports to the release/v1.26.0 branch #11713

Merged
merged 5 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ workflows:
- build
suite: itest-sector_pledge
target: "./itests/sector_pledge_test.go"
resource_class: 2xlarge
get-params: true

- test:
Expand Down
2 changes: 1 addition & 1 deletion .circleci/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ workflows:
- build
suite: itest-[[ $name ]]
target: "./itests/[[ $file ]]"
[[- if or (eq $name "worker") (eq $name "deals_concurrent") (eq $name "wdpost_worker_config")]]
[[- if or (eq $name "worker") (eq $name "deals_concurrent") (eq $name "wdpost_worker_config") (eq $name "sector_pledge")]]
resource_class: 2xlarge
[[- end]]
[[- if or (eq $name "wdpost") (eq $name "sector_pledge")]]
Expand Down
8 changes: 7 additions & 1 deletion itests/kit/ensemble_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ var DefaultEnsembleOpts = ensembleOpts{
}

// MockProofs activates mock proofs for the entire ensemble.
func MockProofs() EnsembleOpt {
func MockProofs(e ...bool) EnsembleOpt {
if len(e) > 0 && !e[0] {
return func(opts *ensembleOpts) error {
return nil
}
}

return func(opts *ensembleOpts) error {
opts.mockProofs = true
// since we're using mock proofs, we don't need to download
Expand Down
20 changes: 15 additions & 5 deletions itests/sector_pledge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"

"github.com/filecoin-project/lotus/api"
Expand Down Expand Up @@ -39,7 +40,7 @@ func TestPledgeSectors(t *testing.T) {
defer cancel()

_, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
ens.InterconnectAll().BeginMiningMustPost(blockTime)

miner.PledgeSectors(ctx, nSectors, 0, nil)
}
Expand All @@ -65,12 +66,18 @@ func TestPledgeBatching(t *testing.T) {
//stm: @SECTOR_PRE_COMMIT_FLUSH_001, @SECTOR_COMMIT_FLUSH_001
blockTime := 50 * time.Millisecond

runTest := func(t *testing.T, nSectors int) {
runTest := func(t *testing.T, nSectors int, aggregate bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
kit.QuietMiningLogs()

client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(!aggregate), kit.MutateSealingConfig(func(sc *config.SealingConfig) {
if aggregate {
sc.AggregateAboveBaseFee = types.FIL(big.Zero())
}
}))
ens.InterconnectAll().BeginMiningMustPost(blockTime)

client.WaitTillChain(ctx, kit.HeightAtLeast(10))

Expand Down Expand Up @@ -114,7 +121,10 @@ func TestPledgeBatching(t *testing.T) {
}

t.Run("100", func(t *testing.T) {
runTest(t, 100)
runTest(t, 100, false)
})
t.Run("10-agg", func(t *testing.T) {
runTest(t, 10, true)
})
}

Expand Down
2 changes: 1 addition & 1 deletion node/builder_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func ConfigFullNode(c interface{}) Option {
// If the Eth JSON-RPC is enabled, enable storing events at the ChainStore.
// This is the case even if real-time and historic filtering are disabled,
// as it enables us to serve logs in eth_getTransactionReceipt.
If(cfg.Fevm.EnableEthRPC, Override(StoreEventsKey, modules.EnableStoringEvents)),
If(cfg.Fevm.EnableEthRPC || cfg.Events.EnableActorEventsAPI, Override(StoreEventsKey, modules.EnableStoringEvents)),

Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),

Expand Down
19 changes: 9 additions & 10 deletions storage/pipeline/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return nil, xerrors.Errorf("getting config: %w", err)
}

if notif && total < cfg.MaxCommitBatch {
if notif && total < cfg.MaxCommitBatch && cfg.AggregateCommits {
return nil, nil
}

Expand All @@ -233,7 +233,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return false
}

individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut()
individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut() || !cfg.AggregateCommits

if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) {
if ts.MinTicketBlock().ParentBaseFee.LessThan(cfg.AggregateAboveBaseFee) {
Expand Down Expand Up @@ -331,6 +331,9 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto
return nil, err
}

// sort sectors by number
sort.Slice(sectors, func(i, j int) bool { return sectors[i] < sectors[j] })

total := len(sectors)

res := sealiface.CommitBatchRes{
Expand Down Expand Up @@ -371,10 +374,6 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto
return nil, nil
}

sort.Slice(infos, func(i, j int) bool {
return infos[i].Number < infos[j].Number
})

proofs := make([][]byte, 0, total)
for _, info := range infos {
proofs = append(proofs, b.todo[info.Number].Proof)
Expand Down Expand Up @@ -444,13 +443,13 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors2Params: %w", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors3Params: %w", err)
}

_, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitSectors3, needFunds, maxFee, enc.Bytes())

if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) {
log.Errorf("simulating CommitBatch message failed: %s", err)
log.Errorf("simulating CommitBatch message failed (%x): %s", enc.Bytes(), err)
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err)
}
Expand All @@ -474,7 +473,7 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto

res.Msg = &mcid

log.Infow("Sent ProveCommitSectors2 message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))
log.Infow("Sent ProveCommitSectors3 message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))

return []sealiface.CommitBatchRes{res}, nil
}
Expand Down Expand Up @@ -591,7 +590,7 @@ func (b *CommitBatcher) processBatchV1(cfg sealiface.Config, sectors []abi.Secto
_, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())

if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) {
log.Errorf("simulating CommitBatch message failed: %s", err)
log.Errorf("simulating CommitBatch message failed (%x): %s", enc.Bytes(), err)
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err)
}
Expand Down
30 changes: 25 additions & 5 deletions storage/pipeline/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"os"
"reflect"
"runtime"
"time"

"golang.org/x/xerrors"
Expand Down Expand Up @@ -39,8 +40,27 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
return nil, processed, nil
}

return func(ctx statemachine.Context, si SectorInfo) error {
err := next(ctx, si)
return func(ctx statemachine.Context, si SectorInfo) (err error) {
// handle panics
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 1<<16)
n := runtime.Stack(buf, false)
buf = buf[:n]

l := Log{
Timestamp: uint64(time.Now().Unix()),
Message: fmt.Sprintf("panic: %v\n%s", r, buf),
Kind: "panic",
}
si.logAppend(l)

err = fmt.Errorf("panic: %v\n%s", r, buf)
}
}()

// execute the next state
err = next(ctx, si)
if err != nil {
log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err)
return nil
Expand Down Expand Up @@ -127,8 +147,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
),
Committing: planCommitting,
CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizedAvailable{}, SubmitCommit),
on(SectorFinalized{}, SubmitCommitAggregate),
on(SectorFinalizedAvailable{}, SubmitCommitAggregate),
on(SectorFinalizeFailed{}, CommitFinalizeFailed),
),
SubmitCommit: planOne(
Expand Down Expand Up @@ -674,7 +694,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
}
case SectorCommitted: // the normal case
e.apply(state)
state.State = SubmitCommit
state.State = SubmitCommitAggregate
case SectorProofReady: // early finalize
e.apply(state)
state.State = CommitFinalize
Expand Down
23 changes: 10 additions & 13 deletions storage/pipeline/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,18 @@ func TestHappyPath(t *testing.T) {
require.Equal(m.t, m.state.State, Committing)

m.planSingle(SectorCommitted{})
require.Equal(m.t, m.state.State, SubmitCommit)
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

m.planSingle(SectorCommitSubmitted{})
require.Equal(m.t, m.state.State, CommitWait)
m.planSingle(SectorCommitAggregateSent{})
require.Equal(m.t, m.state.State, CommitAggregateWait)

m.planSingle(SectorProving{})
require.Equal(m.t, m.state.State, FinalizeSector)

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, Proving)

expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector, Proving}
expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
Expand Down Expand Up @@ -135,9 +135,6 @@ func TestHappyPathFinalizeEarly(t *testing.T) {
require.Equal(m.t, m.state.State, CommitFinalize)

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, SubmitCommit)

m.planSingle(SectorSubmitCommitAggregate{})
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

m.planSingle(SectorCommitAggregateSent{})
Expand All @@ -149,7 +146,7 @@ func TestHappyPathFinalizeEarly(t *testing.T) {
m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, Proving)

expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving}
expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
Expand Down Expand Up @@ -188,9 +185,9 @@ func TestCommitFinalizeFailed(t *testing.T) {
require.Equal(m.t, m.state.State, CommitFinalize)

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, SubmitCommit)
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

expected := []SectorState{Committing, CommitFinalize, CommitFinalizeFailed, CommitFinalize, SubmitCommit}
expected := []SectorState{Committing, CommitFinalize, CommitFinalizeFailed, CommitFinalize, SubmitCommitAggregate}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
Expand Down Expand Up @@ -242,10 +239,10 @@ func TestSeedRevert(t *testing.T) {
// not changing the seed this time
_, _, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state)
require.NoError(t, err)
require.Equal(m.t, m.state.State, SubmitCommit)
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

m.planSingle(SectorCommitSubmitted{})
require.Equal(m.t, m.state.State, CommitWait)
m.planSingle(SectorCommitAggregateSent{})
require.Equal(m.t, m.state.State, CommitAggregateWait)

m.planSingle(SectorProving{})
require.Equal(m.t, m.state.State, FinalizeSector)
Expand Down
26 changes: 20 additions & 6 deletions storage/pipeline/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
for _, piece := range sector.Pieces {
used += piece.Piece().Size.Unpadded()

if !piece.HasDealInfo() {
continue
}

endEpoch, err := piece.EndEpoch()
if err != nil {
return xerrors.Errorf("piece.EndEpoch: %w", err)
}

if piece.HasDealInfo() && endEpoch > lastDealEnd {
if endEpoch > lastDealEnd {
lastDealEnd = endEpoch
}
}
Expand Down Expand Up @@ -953,20 +957,30 @@ func (m *Sealing) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showO
return api.SectorInfo{}, err
}

nv, err := m.Api.StateNetworkVersion(ctx, types.EmptyTSK)
if err != nil {
return api.SectorInfo{}, xerrors.Errorf("getting network version: %w", err)
}

deals := make([]abi.DealID, len(info.Pieces))
pieces := make([]api.SectorPiece, len(info.Pieces))
for i, piece := range info.Pieces {
// todo make this work with DDO deals in some reasonable way

pieces[i].Piece = piece.Piece()
if !piece.HasDealInfo() || piece.Impl().PublishCid == nil {

if !piece.HasDealInfo() {
continue
}

pdi := piece.Impl()
if pdi.Valid(nv) != nil {
continue
}

pdi := piece.DealInfo().Impl() // copy
pieces[i].DealInfo = &pdi

deals[i] = piece.DealInfo().Impl().DealID
if pdi.PublishCid != nil {
deals[i] = pdi.DealID
}
}

log := make([]api.SectorLog, len(info.Log))
Expand Down
2 changes: 1 addition & 1 deletion storage/pipeline/sector_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const (
CommitFinalizeFailed SectorState = "CommitFinalizeFailed"

// single commit
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain (deprecated)
CommitWait SectorState = "CommitWait" // wait for the commit message to land on chain

SubmitCommitAggregate SectorState = "SubmitCommitAggregate"
Expand Down
Loading