Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build indexes for legacy deals #1539

Merged
merged 1 commit into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion gql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/fundmanager"
gqltypes "github.com/filecoin-project/boost/gql/types"
"github.com/filecoin-project/boost/lib/legacy"
"github.com/filecoin-project/boost/lib/mpoolmonitor"
"github.com/filecoin-project/boost/markets/storageadapter"
"github.com/filecoin-project/boost/node/config"
Expand Down Expand Up @@ -61,6 +62,7 @@ type resolver struct {
fundMgr *fundmanager.FundManager
storageMgr *storagemanager.StorageManager
provider *storagemarket.Provider
legacyDeals *legacy.LegacyDealsManager
legacyProv gfm_storagemarket.StorageProvider
legacyDT dtypes.ProviderDataTransfer
ps piecestore.PieceStore
Expand All @@ -73,7 +75,7 @@ type resolver struct {
mpool *mpoolmonitor.MpoolMonitor
}

func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, spApi sealingpipeline.API, provider *storagemarket.Provider, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, fullNode v1api.FullNode, ssm *sectorstatemgr.SectorStateMgr, mpool *mpoolmonitor.MpoolMonitor) *resolver {
func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, spApi sealingpipeline.API, provider *storagemarket.Provider, legacyDeals *legacy.LegacyDealsManager, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, fullNode v1api.FullNode, ssm *sectorstatemgr.SectorStateMgr, mpool *mpoolmonitor.MpoolMonitor) *resolver {
return &resolver{
ctx: ctx,
cfg: cfg,
Expand All @@ -87,6 +89,7 @@ func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo
fundMgr: fundMgr,
storageMgr: storageMgr,
provider: provider,
legacyDeals: legacyDeals,
legacyProv: legacyProv,
legacyDT: legacyDT,
ps: ps,
Expand Down
4 changes: 2 additions & 2 deletions gql/resolver_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func (r *resolver) LegacyDeals(ctx context.Context, args dealsArgs) (*legacyDeal
}, nil
}

func (r *resolver) LegacyDealsCount() (int32, error) {
dealCount, err := r.legacyProv.LocalDealCount()
func (r *resolver) LegacyDealsCount(ctx context.Context) (int32, error) {
dealCount, err := r.legacyDeals.DealCount(ctx)
if err != nil {
return 0, fmt.Errorf("getting deal count: %w", err)
}
Expand Down
81 changes: 44 additions & 37 deletions gql/resolver_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package gql
import (
"context"
"fmt"
"sort"
"time"

"github.com/filecoin-project/boost-gfm/retrievalmarket"
"github.com/filecoin-project/boost-gfm/storagemarket"
gqltypes "github.com/filecoin-project/boost/gql/types"
pdtypes "github.com/filecoin-project/boost/piecedirectory/types"
"github.com/filecoin-project/boostd-data/svc/types"
Expand All @@ -15,6 +15,7 @@ import (
"github.com/graph-gophers/graphql-go"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"golang.org/x/sync/errgroup"
)

type IndexStatus string
Expand Down Expand Up @@ -65,8 +66,10 @@ type pieceResolver struct {
}

type flaggedPieceResolver struct {
Piece *pieceResolver
CreatedAt graphql.Time
PieceCid string
IndexStatus *indexStatus
DealCount int32
CreatedAt graphql.Time
}

type piecesFlaggedArgs struct {
Expand Down Expand Up @@ -117,22 +120,40 @@ func (r *resolver) PiecesFlagged(ctx context.Context, args piecesFlaggedArgs) (*
return nil, err
}

allLegacyDeals, err := r.legacyProv.ListLocalDeals()
if err != nil {
return nil, err
}

var eg errgroup.Group
flaggedPieceResolvers := make([]*flaggedPieceResolver, 0, len(flaggedPieces))
for _, flaggedPiece := range flaggedPieces {
pieceResolver, err := r.pieceStatus(ctx, flaggedPiece.PieceCid, allLegacyDeals)
if err != nil {
return nil, err
}
flaggedPieceResolvers = append(flaggedPieceResolvers, &flaggedPieceResolver{
Piece: pieceResolver,
CreatedAt: graphql.Time{Time: flaggedPiece.CreatedAt},
flaggedPiece := flaggedPiece
eg.Go(func() error {
// Get piece info from local index directory
pieceInfo, pmErr := r.piecedirectory.GetPieceMetadata(ctx, flaggedPiece.PieceCid)
if pmErr != nil && !types.IsNotFound(pmErr) {
return pmErr
}

// Get the state of the piece's index
idxStatus, err := r.getIndexStatus(pieceInfo, pmErr)
if err != nil {
return err
}

flaggedPieceResolvers = append(flaggedPieceResolvers, &flaggedPieceResolver{
PieceCid: flaggedPiece.PieceCid.String(),
IndexStatus: idxStatus,
DealCount: int32(len(pieceInfo.Deals)),
CreatedAt: graphql.Time{Time: flaggedPiece.CreatedAt},
})
return nil
})
}
err = eg.Wait()
if err != nil {
return nil, err
}

sort.Slice(flaggedPieceResolvers, func(i, j int) bool {
return flaggedPieceResolvers[i].CreatedAt.After(flaggedPieces[j].CreatedAt)
})

return &flaggedPieceListResolver{
TotalCount: int32(count),
Expand Down Expand Up @@ -194,15 +215,12 @@ func (r *resolver) PiecesWithRootPayloadCid(ctx context.Context, args struct{ Pa
}

// Get legacy markets deals by payload cid
// TODO: add method to markets to filter deals by payload CID
allLegacyDeals, err := r.legacyProv.ListLocalDeals()
legacyDeals, err := r.legacyDeals.ByPayloadCid(ctx, payloadCid)
if err != nil {
return nil, err
}
for _, dl := range allLegacyDeals {
if dl.Ref.Root == payloadCid {
pieceCidSet[dl.ClientDealProposal.Proposal.PieceCID.String()] = struct{}{}
}
for _, dl := range legacyDeals {
pieceCidSet[dl.ClientDealProposal.Proposal.PieceCID.String()] = struct{}{}
}

pieceCids := make([]string, 0, len(pieceCidSet))
Expand Down Expand Up @@ -258,15 +276,6 @@ func (r *resolver) PieceStatus(ctx context.Context, args struct{ PieceCid string
return nil, fmt.Errorf("%s is not a valid piece cid", args.PieceCid)
}

allLegacyDeals, err := r.legacyProv.ListLocalDeals()
if err != nil {
return nil, err
}

return r.pieceStatus(ctx, pieceCid, allLegacyDeals)
}

func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyDeals []storagemarket.MinerDeal) (*pieceResolver, error) {
// Get piece info from local index directory
pieceInfo, pmErr := r.piecedirectory.GetPieceMetadata(ctx, pieceCid)
if pmErr != nil && !types.IsNotFound(pmErr) {
Expand All @@ -280,11 +289,9 @@ func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyD
}

// Get legacy markets deals by piece Cid
var legacyDeals []storagemarket.MinerDeal
for _, dl := range allLegacyDeals {
if dl.Ref.PieceCid != nil && *dl.Ref.PieceCid == pieceCid {
legacyDeals = append(legacyDeals, dl)
}
legacyDeals, err := r.legacyDeals.ByPieceCid(ctx, pieceCid)
if err != nil {
return nil, err
}

// Convert local index directory deals to graphQL format
Expand Down Expand Up @@ -357,7 +364,7 @@ func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyD
}

// Get the state of the piece's index
idxStatus, err := r.getIndexStatus(ctx, pieceCid, pieceInfo, pmErr, deals)
idxStatus, err := r.getIndexStatus(pieceInfo, pmErr)
if err != nil {
return nil, err
}
Expand All @@ -370,7 +377,7 @@ func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyD
}, nil
}

func (r *resolver) getIndexStatus(ctx context.Context, pieceCid cid.Cid, md pdtypes.PieceDirMetadata, mdErr error, deals []*pieceDealResolver) (*indexStatus, error) {
func (r *resolver) getIndexStatus(md pdtypes.PieceDirMetadata, mdErr error) (*indexStatus, error) {
var idxst IndexStatus
idxerr := ""

Expand Down
8 changes: 2 additions & 6 deletions gql/resolver_sealingpipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,24 +228,20 @@ func (r *resolver) populateWaitDealsSectors(ctx context.Context, sectorNumbers [
}

// match not found in boost db - fallback to legacy deals list
lds, err := r.legacyProv.ListLocalDeals()
lds, err := r.legacyDeals.ByPublishCid(ctx, *publishCid)
if err != nil {
return nil, err
}

var j int
for ; j < len(lds); j++ {
l := lds[j]
if l.PublishCid == nil {
continue
}

lpcid, err := l.ClientDealProposal.Proposal.Cid()
if err != nil {
return nil, err
}

if l.PublishCid.Equals(*publishCid) && lpcid.Equals(dcid) {
if lpcid.Equals(dcid) {
break
}
}
Expand Down
4 changes: 3 additions & 1 deletion gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ type PieceStatus {

type FlaggedPieceStatus {
CreatedAt: Time!
Piece: PieceStatus!
PieceCid: String!
IndexStatus: IndexStatus!
DealCount: Int!
}

type FlaggedPiecesList {
Expand Down
Loading