Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: curio: Keep more sector metadata in the DB long-term #11933

Merged
merged 10 commits into from
May 7, 2024
2 changes: 1 addition & 1 deletion cmd/curio/guidedsetup/guidedsetup.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func configToDB(d *MigrationData) {

chainApiInfo := fmt.Sprintf("%s:%s", string(token), ainfo.Addr)

d.MinerID, err = SaveConfigToLayer(d.MinerConfigPath, chainApiInfo)
d.MinerID, err = SaveConfigToLayerMigrateSectors(d.MinerConfigPath, chainApiInfo)
if err != nil {
d.say(notice, "Error saving config to layer: %s. Aborting Migration", err.Error())
os.Exit(1)
Expand Down
180 changes: 176 additions & 4 deletions cmd/curio/guidedsetup/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,29 @@ import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"path"
"strings"

"github.com/BurntSushi/toml"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/samber/lo"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-statestore"

"github.com/filecoin-project/lotus/cmd/curio/deps"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/must"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
)

const (
Expand All @@ -29,7 +35,7 @@ const (

const FlagMinerRepoDeprecation = "storagerepo"

func SaveConfigToLayer(minerRepoPath, chainApiInfo string) (minerAddress address.Address, err error) {
func SaveConfigToLayerMigrateSectors(minerRepoPath, chainApiInfo string) (minerAddress address.Address, err error) {
_, say := SetupLanguage()
ctx := context.Background()

Expand Down Expand Up @@ -97,9 +103,6 @@ func SaveConfigToLayer(minerRepoPath, chainApiInfo string) (minerAddress address
if err != nil {
return minerAddress, xerrors.Errorf("opening miner metadata datastore: %w", err)
}
defer func() {
// _ = mmeta.Close()
}()

maddrBytes, err := mmeta.Get(ctx, datastore.NewKey("miner-address"))
if err != nil {
Expand All @@ -111,6 +114,12 @@ func SaveConfigToLayer(minerRepoPath, chainApiInfo string) (minerAddress address
return minerAddress, xerrors.Errorf("parsing miner actor address: %w", err)
}

if err := MigrateSectors(ctx, addr, mmeta, db, func(nSectors int) {
say(plain, "Migrating metadata for %d sectors.", nSectors)
}); err != nil {
return address.Address{}, xerrors.Errorf("migrating sectors: %w", err)
}

minerAddress = addr

curioCfg.Addresses = []config.CurioAddresses{{
Expand Down Expand Up @@ -256,3 +265,166 @@ func ensureEmptyArrays(cfg *config.CurioConfig) {
cfg.Apis.ChainApiInfo = []string{}
}
}

func cidPtrToStrptr(c *cid.Cid) *string {
if c == nil {
return nil
}
s := c.String()
return &s
}

func coalescePtrs[A any](a, b *A) *A {
if a != nil {
return a
}
return b
}

func MigrateSectors(ctx context.Context, maddr address.Address, mmeta datastore.Batching, db *harmonydb.DB, logMig func(int)) error {
mid, err := address.IDFromAddress(maddr)
if err != nil {
return xerrors.Errorf("getting miner ID: %w", err)
}

sts := statestore.New(namespace.Wrap(mmeta, datastore.NewKey(sealing.SectorStorePrefix)))

var sectors []sealing.SectorInfo
if err := sts.List(&sectors); err != nil {
return xerrors.Errorf("getting sector list: %w", err)
}

logMig(len(sectors))

migratableState := func(state sealing.SectorState) bool {
switch state {
case sealing.Proving, sealing.Available, sealing.Removed:
return true
default:
return false
}
}

unmigratable := map[sealing.SectorState]int{}

for _, sector := range sectors {
if !migratableState(sector.State) {
unmigratable[sector.State]++
continue
}
}

if len(unmigratable) > 0 {
fmt.Println("The following sector states are not migratable:")
for state, count := range unmigratable {
fmt.Printf(" %s: %d\n", state, count)
}

if os.Getenv("CURIO_SKIP_UNMIGRATABLE_SECTORS") != "1" {
magik6k marked this conversation as resolved.
Show resolved Hide resolved
return xerrors.Errorf("cannot migrate sectors with these states. Set CURIO_SKIP_UNMIGRATABLE_SECTORS=1 to proceed anyway")
}
}

for _, sector := range sectors {
if !migratableState(sector.State) || sector.State == sealing.Removed {
continue
}

// Insert sector metadata
_, err := db.Exec(ctx, `
INSERT INTO sectors_meta (sp_id, sector_num, reg_seal_proof, ticket_epoch, ticket_value,
orig_sealed_cid, orig_unsealed_cid, cur_sealed_cid, cur_unsealed_cid,
msg_cid_precommit, msg_cid_commit, msg_cid_update, seed_epoch, seed_value)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT (sp_id, sector_num) DO UPDATE
SET reg_seal_proof = excluded.reg_seal_proof, ticket_epoch = excluded.ticket_epoch, ticket_value = excluded.ticket_value,
orig_sealed_cid = excluded.orig_sealed_cid, orig_unsealed_cid = excluded.orig_unsealed_cid, cur_sealed_cid = excluded.cur_sealed_cid,
cur_unsealed_cid = excluded.cur_unsealed_cid, msg_cid_precommit = excluded.msg_cid_precommit, msg_cid_commit = excluded.msg_cid_commit,
msg_cid_update = excluded.msg_cid_update, seed_epoch = excluded.seed_epoch, seed_value = excluded.seed_value`,
mid,
sector.SectorNumber,
sector.SectorType,
sector.TicketEpoch,
sector.TicketValue,
cidPtrToStrptr(sector.CommR),
cidPtrToStrptr(sector.CommD),
cidPtrToStrptr(coalescePtrs(sector.UpdateSealed, sector.CommR)),
cidPtrToStrptr(coalescePtrs(sector.UpdateUnsealed, sector.CommD)),
cidPtrToStrptr(sector.PreCommitMessage),
cidPtrToStrptr(sector.CommitMessage),
cidPtrToStrptr(sector.ReplicaUpdateMessage),
sector.SeedEpoch,
sector.SeedValue,
)
if err != nil {
b, _ := json.MarshalIndent(sector, "", " ")
fmt.Println(string(b))

return xerrors.Errorf("inserting/updating sectors_meta for sector %d: %w", sector.SectorNumber, err)
}

// Process each piece within the sector
for j, piece := range sector.Pieces {
dealID := int64(0)
startEpoch := int64(0)
endEpoch := int64(0)
var pamJSON *string

if piece.HasDealInfo() {
dealInfo := piece.DealInfo()
if dealInfo.Impl().DealProposal != nil {
dealID = int64(dealInfo.Impl().DealID)
}

startEpoch = int64(must.One(dealInfo.StartEpoch()))
endEpoch = int64(must.One(dealInfo.EndEpoch()))
if piece.Impl().PieceActivationManifest != nil {
pam, err := json.Marshal(piece.Impl().PieceActivationManifest)
if err != nil {
return xerrors.Errorf("error marshalling JSON for piece %d in sector %d: %w", j, sector.SectorNumber, err)
}
ps := string(pam)
pamJSON = &ps
}
}

// Splitting the SQL statement for readability and adding new fields
_, err = db.Exec(ctx, `
INSERT INTO sectors_meta_pieces (
sp_id, sector_num, piece_num, piece_cid, piece_size,
requested_keep_data, raw_data_size, start_epoch, orig_end_epoch,
f05_deal_id, ddo_pam
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (sp_id, sector_num, piece_num) DO UPDATE
SET
piece_cid = excluded.piece_cid,
piece_size = excluded.piece_size,
requested_keep_data = excluded.requested_keep_data,
raw_data_size = excluded.raw_data_size,
start_epoch = excluded.start_epoch,
orig_end_epoch = excluded.orig_end_epoch,
f05_deal_id = excluded.f05_deal_id,
ddo_pam = excluded.ddo_pam`,
mid,
sector.SectorNumber,
j,
piece.PieceCID(),
piece.Piece().Size,
piece.HasDealInfo(),
nil, // raw_data_size might be calculated based on the piece size, or retrieved if available
startEpoch,
endEpoch,
dealID,
pamJSON,
)
if err != nil {
b, _ := json.MarshalIndent(sector, "", " ")
fmt.Println(string(b))

return xerrors.Errorf("inserting/updating sector_meta_pieces for sector %d, piece %d: %w", sector.SectorNumber, j, err)
}
}
}

return nil
}
1 change: 0 additions & 1 deletion cmd/curio/migrate.go

This file was deleted.

73 changes: 73 additions & 0 deletions cmd/curio/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"

"github.com/ipfs/go-datastore"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

Expand All @@ -13,15 +14,18 @@ import (
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/curio/deps"
"github.com/filecoin-project/lotus/cmd/curio/guidedsetup"
"github.com/filecoin-project/lotus/curiosrc/seal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/node/repo"
)

var sealCmd = &cli.Command{
Name: "seal",
Usage: "Manage the sealing pipeline",
Subcommands: []*cli.Command{
sealStartCmd,
sealMigrateLMSectorsCmd,
},
}

Expand Down Expand Up @@ -133,3 +137,72 @@ var sealStartCmd = &cli.Command{
return nil
},
}

var sealMigrateLMSectorsCmd = &cli.Command{
Name: "migrate-lm-sectors",
Usage: "(debug tool) Copy LM sector metadata into Curio DB",
Hidden: true, // only needed in advanced cases where manual repair is needed
Flags: []cli.Flag{
&cli.StringFlag{
Name: "miner-repo",
Usage: "Path to miner repo",
Value: "~/.lotusminer",
},
},
Action: func(cctx *cli.Context) error {
ctx := lcli.ReqContext(cctx)
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}

r, err := repo.NewFS(cctx.String("miner-repo"))
if err != nil {
return err
}

ok, err := r.Exists()
if err != nil {
return err
}

if !ok {
return fmt.Errorf("repo not initialized at: %s", cctx.String("miner-repo"))
}

lr, err := r.LockRO(repo.StorageMiner)
if err != nil {
return fmt.Errorf("locking repo: %w", err)
}
defer func() {
err = lr.Close()
if err != nil {
fmt.Println("error closing repo: ", err)
}
}()

mmeta, err := lr.Datastore(ctx, "/metadata")
if err != nil {
return xerrors.Errorf("opening miner metadata datastore: %w", err)
}

maddrBytes, err := mmeta.Get(ctx, datastore.NewKey("miner-address"))
if err != nil {
return xerrors.Errorf("getting miner address datastore entry: %w", err)
}

addr, err := address.NewFromBytes(maddrBytes)
if err != nil {
return xerrors.Errorf("parsing miner actor address: %w", err)
}

err = guidedsetup.MigrateSectors(ctx, addr, mmeta, db, func(n int) {
fmt.Printf("Migrating %d sectors\n", n)
})
if err != nil {
return xerrors.Errorf("migrating sectors: %w", err)
}

return nil
},
}
3 changes: 2 additions & 1 deletion curiosrc/market/deal_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func (p *PieceIngester) AllocatePieceToSector(ctx context.Context, maddr address
mid, n, 0,
piece.DealProposal.PieceCID, piece.DealProposal.PieceSize,
source.String(), dataHdrJson, rawSize, !piece.KeepUnsealed,
piece.PublishCid, piece.DealID, dealProposalJson, piece.DealSchedule.StartEpoch, piece.DealSchedule.EndEpoch)
piece.PublishCid, piece.DealID, dealProposalJson,
piece.DealSchedule.StartEpoch, piece.DealSchedule.EndEpoch)
if err != nil {
return false, xerrors.Errorf("inserting into sectors_sdr_initial_pieces: %w", err)
}
Expand Down
Loading