Skip to content

Commit

Permalink
local index directory: recover tool (#1410)
Browse files Browse the repository at this point in the history
* initial disaster recovery tool for LID

* wip

* do not block on individual error

* instantiate lid

* report

* catch signal

* fixup

* comment out sector already in progress

* fixup

* start containers with init: true

* record that we dont have an unsealed copy

* match deals with boost sqlite db and piece store

* fixup

* fixup

* use logger

* fixup

* disable stacktrace

* fixup

* extract piece store away from disaster recovery struct

* add more sanity checks

* compare IsUnsealed vs storage find

* improve safeIsUnseal

* fixup

* better logs

* expand repodir

* calc properly next offset

* fixup

* add sector id to logs

* incr offset

* break after finding expired deal

* more logs

* fewer logs

* better logs

* better error

* refactor

* refactor minerApi

* better logs

* add time around add index

* pd.Start
  • Loading branch information
nonsense authored and LexLuthr committed Jul 20, 2023
1 parent 92a9606 commit 6bc7eb9
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 124 deletions.
1 change: 1 addition & 0 deletions cmd/boostd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func main() {
dagstoreCmd,
netCmd,
pieceDirCmd,
recoverCmd,
},
}
app.Setup()
Expand Down
9 changes: 4 additions & 5 deletions cmd/boostd/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/filecoin-project/boost/cmd/lib"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/writer"
Expand Down Expand Up @@ -191,14 +190,14 @@ func action(cctx *cli.Context) error {
if ignoreLID {
pd = nil
} else {
cl := bdclient.NewStore()
defer cl.Close(ctx)
err = cl.Dial(ctx, cctx.String("api-lid"))
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err = pdClient.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
pd = piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))
pd = piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
pd.Start(ctx)
}

Expand Down
126 changes: 7 additions & 119 deletions cmd/migrate-lid/migrate_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,20 @@ import (
"strings"
"time"

"github.com/filecoin-project/boost-gfm/piecestore"
"github.com/filecoin-project/boost/cmd/lib"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boostd-data/couchbase"
"github.com/filecoin-project/boostd-data/ldb"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-address"
vfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm"
"github.com/filecoin-project/go-fil-markets/piecestore"
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statemachine/fsm"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipld/go-car/v2/index"
"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-multicodec"
Expand Down Expand Up @@ -347,7 +340,7 @@ func migrateIndex(ctx context.Context, ipath idxPath, store StoreMigrationApi, f

func migratePieceStore(ctx context.Context, logger *zap.SugaredLogger, bar *progressbar.ProgressBar, repoDir string, store StoreMigrationApi) (int, error) {
// Open the datastore in the existing repo
ds, err := openDataStore(repoDir)
ds, err := lib.OpenDataStore(repoDir)
if err != nil {
return 0, fmt.Errorf("creating piece store from repo %s: %w", repoDir, err)
}
Expand All @@ -364,12 +357,12 @@ func migratePieceStore(ctx context.Context, logger *zap.SugaredLogger, bar *prog
// Create a mapping of on-chain deal ID to deal proposal cid.
// This is needed below so that we can map from the legacy piece store
// info to a legacy deal.
propCidByChainDealID, err := getPropCidByChainDealID(ctx, ds)
propCidByChainDealID, err := lib.GetPropCidByChainDealID(ctx, ds)
if err != nil {
return 0, fmt.Errorf("building chain deal id -> proposal cid map: %w", err)
}

ps, err := openPieceStore(ctx, ds)
ps, err := lib.OpenPieceStore(ctx, ds)
if err != nil {
return 0, fmt.Errorf("opening piece store: %w", err)
}
Expand Down Expand Up @@ -484,81 +477,6 @@ func migratePieceStore(ctx context.Context, logger *zap.SugaredLogger, bar *prog
return errorCount, nil
}

func getPropCidByChainDealID(ctx context.Context, ds *backupds.Datastore) (map[abi.DealID]cid.Cid, error) {
deals, err := getLegacyDealsFSM(ctx, ds)
if err != nil {
return nil, err
}

// Build a mapping of chain deal ID to proposal CID
var list []storagemarket.MinerDeal
if err := deals.List(&list); err != nil {
return nil, err
}

byChainDealID := make(map[abi.DealID]cid.Cid, len(list))
for _, d := range list {
if d.DealID != 0 {
byChainDealID[d.DealID] = d.ProposalCid
}
}

return byChainDealID, nil
}

func getLegacyDealsFSM(ctx context.Context, ds *backupds.Datastore) (fsm.Group, error) {
// Get the deals FSM
provDS := namespace.Wrap(ds, datastore.NewKey("/deals/provider"))
deals, migrate, err := vfsm.NewVersionedFSM(provDS, fsm.Parameters{
StateType: storagemarket.MinerDeal{},
StateKeyField: "State",
}, nil, "2")
if err != nil {
return nil, fmt.Errorf("reading legacy deals from datastore: %w", err)
}

err = migrate(ctx)
if err != nil {
return nil, fmt.Errorf("running provider fsm migration script: %w", err)
}

return deals, err
}

func openDataStore(path string) (*backupds.Datastore, error) {
ctx := context.Background()

rpo, err := repo.NewFS(path)
if err != nil {
return nil, fmt.Errorf("could not open repo %s: %w", path, err)
}

exists, err := rpo.Exists()
if err != nil {
return nil, fmt.Errorf("checking repo %s exists: %w", path, err)
}
if !exists {
return nil, fmt.Errorf("repo does not exist: %s", path)
}

lr, err := rpo.Lock(repo.StorageMiner)
if err != nil {
return nil, fmt.Errorf("locking repo %s: %w", path, err)
}

mds, err := lr.Datastore(ctx, "/metadata")
if err != nil {
return nil, err
}

bds, err := backupds.Wrap(mds, "")
if err != nil {
return nil, fmt.Errorf("opening backupds: %w", err)
}

return bds, nil
}

func getRecords(subject index.Index) ([]model.Record, error) {
records := make([]model.Record, 0)

Expand Down Expand Up @@ -743,13 +661,13 @@ func migrateDBReverse(cctx *cli.Context, repoDir string, dbType string, pieceDir
logger.Infof("starting migration of %d piece infos from %s local index directory to piece store", len(pcids), dbType)

// Open the datastore
ds, err := openDataStore(repoDir)
ds, err := lib.OpenDataStore(repoDir)
if err != nil {
return fmt.Errorf("creating datastore from repo %s: %w", repoDir, err)
}

// Open the Piece Store
ps, err := openPieceStore(ctx, ds)
ps, err := lib.OpenPieceStore(ctx, ds)
if err != nil {
return fmt.Errorf("opening piece store: %w", err)
}
Expand Down Expand Up @@ -830,36 +748,6 @@ func migrateReversePiece(ctx context.Context, pieceCid cid.Cid, pieceDir StoreMi
return migrated, nil
}

func openPieceStore(ctx context.Context, ds *backupds.Datastore) (piecestore.PieceStore, error) {
// Open the piece store
ps, err := piecestoreimpl.NewPieceStore(namespace.Wrap(ds, datastore.NewKey("/storagemarket")))
if err != nil {
return nil, fmt.Errorf("creating piece store from datastore : %w", err)
}

// Wait for the piece store to be ready
ch := make(chan error, 1)
ps.OnReady(func(e error) {
ch <- e
})

err = ps.Start(ctx)
if err != nil {
return nil, fmt.Errorf("starting piece store: %w", err)
}

select {
case err = <-ch:
if err != nil {
return nil, fmt.Errorf("waiting for piece store to be ready: %w", err)
}
case <-ctx.Done():
return nil, errors.New("cancelled while waiting for piece store to be ready")
}

return ps, nil
}

func createLogger(logPath string) (*zap.SugaredLogger, error) {
logCfg := zap.NewDevelopmentConfig()
logCfg.OutputPaths = []string{logPath}
Expand Down

0 comments on commit 6bc7eb9

Please sign in to comment.