Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:支持重建本地数据索引和缓存 #161

Merged
merged 2 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 84 additions & 47 deletions api/impl/venus_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sort"
"time"

"github.com/filecoin-project/go-fil-markets/stores"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -62,6 +64,7 @@ type MarketNodeImpl struct {
Messager clients2.IMixMessage
StorageAsk storageprovider.IStorageAsk
DAGStore *dagstore.DAGStore
DAGStoreWrapper stores.DAGStoreWrapper
PieceStorageMgr *piecestorage.PieceStorageManager
MinerMgr minermgr.IAddrMgr
PaychAPI *paychmgr.PaychAPI
Expand Down Expand Up @@ -472,81 +475,115 @@ func (m MarketNodeImpl) DagstoreListShards(ctx context.Context) ([]types.Dagstor
}

func (m MarketNodeImpl) DagstoreInitializeShard(ctx context.Context, key string) error {
k := shard.KeyFromString(key)

info, err := m.DAGStore.GetShardInfo(k)
//check whether key valid
cidKey, err := cid.Decode(key)
if err != nil {
return fmt.Errorf("failed to get shard info: %w", err)
}
if st := info.ShardState; st != dagstore.ShardStateNew {
return fmt.Errorf("cannot initialize shard; expected state ShardStateNew, was: %s", st.String())
return err
}

ch := make(chan dagstore.ShardResult, 1)
if err = m.DAGStore.AcquireShard(ctx, k, ch, dagstore.AcquireOpts{}); err != nil {
return fmt.Errorf("failed to acquire shard: %w", err)
_, err = m.Repo.StorageDealRepo().GetPieceInfo(ctx, cidKey)
if err != nil {
return err
}

var res dagstore.ShardResult
select {
case res = <-ch:
case <-ctx.Done():
return ctx.Err()
//check whether shard info exit
k := shard.KeyFromString(key)
info, err := m.DAGStore.GetShardInfo(k)
if err != nil && err != dagstore.ErrShardUnknown {
return fmt.Errorf("failed to get shard info: %w", err)
}

if err := res.Error; err != nil {
return fmt.Errorf("failed to acquire shard: %w", err)
if st := info.ShardState; st != dagstore.ShardStateNew {
return fmt.Errorf("cannot initialize shard; expected state ShardStateNew, was: %s", st.String())
}

if res.Accessor != nil {
err = res.Accessor.Close()
if err != nil {
log.Warnw("failed to close shard accessor; continuing", "shard_key", k, "error", err)
}
bs, err := m.DAGStoreWrapper.LoadShard(ctx, cidKey)
if err != nil {
return err
}

return nil
return bs.Close()
}

func (m MarketNodeImpl) DagstoreInitializeAll(ctx context.Context, params types.DagstoreInitializeAllParams) (<-chan types.DagstoreInitializeAllEvent, error) {
// prepare the thottler tokens.
var throttle chan struct{}
if c := params.MaxConcurrency; c > 0 {
throttle = make(chan struct{}, c)
for i := 0; i < c; i++ {
throttle <- struct{}{}
}
}

deals, err := m.Repo.StorageDealRepo().GetDealByAddrAndStatus(ctx, address.Undef, storageprovider.ReadyRetrievalDealStatus...)
if err != nil {
return nil, err
}
// are we initializing only unsealed pieces?
onlyUnsealed := !params.IncludeSealed

info := m.DAGStore.AllShardsInfo()
var toInitialize []string
for k, i := range info {
if i.ShardState != dagstore.ShardStateNew {
for _, deal := range deals {
pieceCid := deal.ClientDealProposal.Proposal.PieceCID
info, err := m.DAGStore.GetShardInfo(shard.KeyFromCID(pieceCid))
if err != nil && err != dagstore.ErrShardUnknown {
return nil, err
}

if info.ShardState != dagstore.ShardStateNew {
continue
}

// if we're initializing only unsealed pieces, check if there's an
// unsealed deal for this piece available.
if onlyUnsealed {
pieceCid, err := cid.Decode(k.String())
if err != nil {
log.Warnw("DagstoreInitializeAll: failed to decode shard key as piece CID; skipping", "shard_key", k.String(), "error", err)
continue
}

_, err = m.PieceStorageMgr.FindStorageForRead(ctx, pieceCid.String())
if err != nil {
//todounseal
log.Warnw("DagstoreInitializeAll: failed to get unsealed status; skipping deal", "piece cid", pieceCid, "error", err)
continue
}
}

//todo trigger unseal
// yes, we're initializing this shard.
toInitialize = append(toInitialize, k.String())
toInitialize = append(toInitialize, pieceCid.String())
}

return m.dagstoreLoadShards(ctx, toInitialize, params.MaxConcurrency)
}

func (m *MarketNodeImpl) DagstoreInitializeStorage(ctx context.Context, storageName string, params types.DagstoreInitializeAllParams) (<-chan types.DagstoreInitializeAllEvent, error) {
storage, err := m.PieceStorageMgr.GetPieceStorageByName(storageName)
if err != nil {
return nil, err
}
resourceIds, err := storage.ListResourceIds(ctx)
if err != nil {
return nil, err
}

var toInitialize []string
for _, resource := range resourceIds {
pieceCid, err := cid.Decode(resource)
if err != nil {
log.Warnf("resource name (%s) was not a valid piece cid %v", resource, err)
continue
}
pieceInfo, err := m.Repo.StorageDealRepo().GetPieceInfo(ctx, pieceCid)
if err != nil || (pieceInfo != nil && len(pieceInfo.Deals) == 0) {
log.Warnf("piece cid %s not in storage deals", pieceCid)
continue
}

_, err = m.DAGStore.GetShardInfo(shard.KeyFromString(resource))
if err != nil && !errors.Is(err, dagstore.ErrShardUnknown) {
return nil, err
}

toInitialize = append(toInitialize, resource)
}

return m.dagstoreLoadShards(ctx, toInitialize, params.MaxConcurrency)
}

func (m MarketNodeImpl) dagstoreLoadShards(ctx context.Context, toInitialize []string, concurrency int) (<-chan types.DagstoreInitializeAllEvent, error) {
// prepare the thottler tokens.
var throttle chan struct{}
if c := concurrency; c > 0 {
throttle = make(chan struct{}, c)
for i := 0; i < c; i++ {
throttle <- struct{}{}
}
}

total := len(toInitialize)
Expand Down Expand Up @@ -623,11 +660,8 @@ func (m MarketNodeImpl) DagstoreInitializeAll(ctx context.Context, params types.
}()

return res, nil

}

func (m MarketNodeImpl) DagstoreRecoverShard(ctx context.Context, key string) error {

k := shard.KeyFromString(key)

info, err := m.DAGStore.GetShardInfo(k)
Expand Down Expand Up @@ -716,6 +750,7 @@ func (m MarketNodeImpl) PaychVoucherList(ctx context.Context, pch address.Addres
return m.PaychAPI.PaychVoucherList(ctx, pch)
}

//ImportV1Data deprecated api
func (m MarketNodeImpl) ImportV1Data(ctx context.Context, src string) error {
type minerDealsIncludeStatus struct {
MinerDeal storagemarket.MinerDeal
Expand Down Expand Up @@ -819,10 +854,12 @@ func (m MarketNodeImpl) ImportV1Data(ctx context.Context, src string) error {
return nil
}

//GetReadUrl deprecated api
func (m MarketNodeImpl) GetReadUrl(ctx context.Context, s string) (string, error) {
panic("not support")
}

//GetWriteUrl deprecated api
func (m MarketNodeImpl) GetWriteUrl(ctx context.Context, s2 string) (string, error) {
panic("not support")
}
Expand Down
76 changes: 76 additions & 0 deletions cli/dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var DagstoreCmd = &cli.Command{
dagstoreInitializeShardCmd,
dagstoreRecoverShardCmd,
dagstoreInitializeAllCmd,
dagstoreInitializeStorageCmd,
dagstoreGcCmd,
},
}
Expand Down Expand Up @@ -217,6 +218,81 @@ var dagstoreInitializeAllCmd = &cli.Command{
},
}

var dagstoreInitializeStorageCmd = &cli.Command{
Name: "initialize-storage",
Usage: "Initialize all uninitialized shards in specify piece storage",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "storage",
Usage: "specify storage to scan for index",
Required: true,
},
&cli.UintFlag{
Name: "concurrency",
Usage: "maximum shards to initialize concurrently at a time; use 0 for unlimited",
Required: true,
},
&cli.BoolFlag{
Name: "include-sealed",
Usage: "initialize sealed pieces as well",
},
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
DefaultText: "depends on output being a TTY",
},
},
Action: func(cctx *cli.Context) error {
if cctx.IsSet("color") {
color.NoColor = !cctx.Bool("color")
}

storageName := cctx.String("storage")
concurrency := cctx.Uint("concurrency")
sealed := cctx.Bool("sealed")

marketsApi, closer, err := NewMarketNode(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

params := types.DagstoreInitializeAllParams{
MaxConcurrency: int(concurrency),
IncludeSealed: sealed,
}

ch, err := marketsApi.DagstoreInitializeStorage(ctx, storageName, params)
if err != nil {
return err
}

for {
select {
case evt, ok := <-ch:
if !ok {
return nil
}
_, _ = fmt.Fprint(os.Stdout, color.New(color.BgHiBlack).Sprintf("(%d/%d)", evt.Current, evt.Total))
_, _ = fmt.Fprint(os.Stdout, " ")
if evt.Event == "start" {
_, _ = fmt.Fprintln(os.Stdout, evt.Key, color.New(color.Reset).Sprint("STARTING"))
} else {
if evt.Success {
_, _ = fmt.Fprintln(os.Stdout, evt.Key, color.New(color.FgGreen).Sprint("SUCCESS"))
} else {
_, _ = fmt.Fprintln(os.Stdout, evt.Key, color.New(color.FgRed).Sprint("ERROR"), evt.Error)
}
}

case <-ctx.Done():
return fmt.Errorf("aborted")
}
}
},
}

var dagstoreGcCmd = &cli.Command{
Name: "gc",
Usage: "Garbage collect the dagstore",
Expand Down
8 changes: 4 additions & 4 deletions dagstore/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewDAGStore(ctx context.Context, cfg *config.DAGStoreConfig, marketApi Mark
return nil, nil, fmt.Errorf("failed to initialise dagstore index repo")
}

dcfg := dagstore.Config{
dCfg := dagstore.Config{
TransientsDir: transientsDir,
IndexRepo: irepo,
Datastore: dstore,
Expand All @@ -105,15 +105,15 @@ func NewDAGStore(ctx context.Context, cfg *config.DAGStoreConfig, marketApi Mark
}

if cfg.MongoTopIndex != nil {
dcfg.TopLevelIndex, err = NewMongoTopIndex(ctx, cfg.MongoTopIndex.Url)
dCfg.TopLevelIndex, err = NewMongoTopIndex(ctx, cfg.MongoTopIndex.Url)
if err != nil {
return nil, nil, err
}
} else {
dcfg.TopLevelIndex = index.NewInverted(dstore)
dCfg.TopLevelIndex = index.NewInverted(dstore)
}

dagst, err := dagstore.NewDAGStore(dcfg)
dagst, err := dagstore.NewDAGStore(dCfg)
if err != nil {
return nil, nil, fmt.Errorf("failed to create DAG store: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/specs-actors/v8 v8.0.1
github.com/filecoin-project/specs-storage v0.4.1
github.com/filecoin-project/venus v1.6.1-0.20220707095905-e14afe441234
github.com/filecoin-project/venus v1.6.1-0.20220708084319-d8da3536c71d
github.com/filecoin-project/venus-auth v1.6.0
github.com/filecoin-project/venus-messager v1.6.0
github.com/gbrlsnchs/jwt/v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,8 @@ github.com/filecoin-project/storetheindex v0.3.5 h1:KoS9TvjPm6zIZfUH8atAHJbVHOO7
github.com/filecoin-project/storetheindex v0.3.5/go.mod h1:0r3d0kSpK63O6AvLr1CjAINLi+nWD49clzcnKV+GLpI=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/filecoin-project/venus v1.6.0/go.mod h1:ukA+xwqDs40lixoa+HDNfuN8b1G4jpm4k0ujceVejSk=
github.com/filecoin-project/venus v1.6.1-0.20220707095905-e14afe441234 h1:gsy9AlBtgYALUgMA5Kj2+PEJk2BJ4D/9+NkpG9vFQpA=
github.com/filecoin-project/venus v1.6.1-0.20220707095905-e14afe441234/go.mod h1:ukA+xwqDs40lixoa+HDNfuN8b1G4jpm4k0ujceVejSk=
github.com/filecoin-project/venus v1.6.1-0.20220708084319-d8da3536c71d h1:Ps9yagHrEXyeGJw5d3jXHXWNeGKzdEAvC4+J30ADcy4=
github.com/filecoin-project/venus v1.6.1-0.20220708084319-d8da3536c71d/go.mod h1:ukA+xwqDs40lixoa+HDNfuN8b1G4jpm4k0ujceVejSk=
github.com/filecoin-project/venus-auth v1.6.0 h1:DLl7q5g1eh6UTpp98MLpRWAI79k6TUw1Myh/RLeaFpU=
github.com/filecoin-project/venus-auth v1.6.0/go.mod h1:x/Cv3zz9z5O+/uqIKgYtk5UsL7nYu+CtiPjyVQ8Lywg=
github.com/filecoin-project/venus-messager v1.6.0 h1:7R0bHYTXSaTy7220cdSBwXDDgFqwzhFTgl06tNQsAmo=
Expand Down
27 changes: 8 additions & 19 deletions models/badger/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (sdr *storageDealRepo) GetDealByAddrAndStatus(ctx context.Context, addr add
var err error
if err = travelDeals(ctx, sdr.ds,
func(deal *types.MinerDeal) (stop bool, err error) {
if deal.ClientDealProposal.Proposal.Provider == addr {
if addr == address.Undef || deal.ClientDealProposal.Proposal.Provider == addr {
if _, ok := filter[deal.State]; !ok {
return
}
Expand Down Expand Up @@ -275,29 +275,18 @@ func (sdr *storageDealRepo) GetDealByDealID(ctx context.Context, mAddr address.A
return deal, err
}

func (sdr *storageDealRepo) GetDealsByPieceStatusV0(ctx context.Context, mAddr address.Address, pieceStatus types.PieceStatus) ([]*types.MinerDeal, error) {
var deals []*types.MinerDeal
var err error
if err = travelDeals(ctx, sdr.ds,
func(inDeal *types.MinerDeal) (bool, error) {
if inDeal.ClientDealProposal.Proposal.Provider == mAddr && inDeal.PieceStatus == pieceStatus {
deals = append(deals, inDeal)
}
return false, nil
}); err != nil {
return nil, err
}

return deals, nil
}

func (sdr *storageDealRepo) GetDealsByPieceStatus(ctx context.Context, mAddr address.Address, pieceStatus types.PieceStatus) ([]*types.MinerDeal, error) {
var deals []*types.MinerDeal

return deals, travelDeals(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) {
if inDeal.ClientDealProposal.Proposal.Provider == mAddr && inDeal.PieceStatus == pieceStatus {
deals = append(deals, inDeal)
if inDeal.PieceStatus != pieceStatus {
return
}
if mAddr != address.Undef && inDeal.ClientDealProposal.Proposal.Provider != mAddr {
return
}

deals = append(deals, inDeal)
return false, nil
})
}
Expand Down
Loading