Skip to content

Commit

Permalink
feat: unsealed from sp through venus-gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
diwufeiwen committed Jan 10, 2023
1 parent a540f61 commit ece80a8
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 23 deletions.
101 changes: 97 additions & 4 deletions dagstore/market_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ import (
"github.com/ipfs/go-cid"

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"

gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2"
vSharedTypes "github.com/filecoin-project/venus/venus-shared/types"

marketMetrics "github.com/filecoin-project/venus-market/v2/metrics"
"github.com/filecoin-project/venus-market/v2/models/repo"
"github.com/filecoin-project/venus-market/v2/piecestorage"
"github.com/filecoin-project/venus-market/v2/storageprovider"
"github.com/filecoin-project/venus-market/v2/utils"
)

Expand All @@ -35,17 +41,28 @@ type marketAPI struct {
useTransient bool
metricsCtx metrics.MetricsCtx
gatewayMarketClient gatewayAPIV2.IMarketClient

throttle throttle.Throttler
}

var _ MarketAPI = (*marketAPI)(nil)

func NewMarketAPI(ctx metrics.MetricsCtx, repo repo.Repo, pieceStorageMgr *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient, useTransient bool) MarketAPI {
func NewMarketAPI(
ctx metrics.MetricsCtx,
repo repo.Repo,
pieceStorageMgr *piecestorage.PieceStorageManager,
gatewayMarketClient gatewayAPIV2.IMarketClient,
useTransient bool,
concurrency int) MarketAPI {

return &marketAPI{
pieceRepo: repo.StorageDealRepo(),
pieceStorageMgr: pieceStorageMgr,
useTransient: useTransient,
metricsCtx: ctx,
gatewayMarketClient: gatewayMarketClient,

throttle: throttle.Fixed(concurrency),
}
}

Expand All @@ -56,11 +73,87 @@ func (m *marketAPI) Start(_ context.Context) error {
func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
_, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String())
if err != nil {
return false, fmt.Errorf("unable to find storage for piece %s %w", pieceCid, err)
log.Warnf("unable to find storage for piece %s: %s", pieceCid, err)

// check it from the SP through venus-gateway
deals, err := m.pieceRepo.GetDealsByPieceCidAndStatus(ctx, pieceCid, storageprovider.ReadyRetrievalDealStatus...)
if err != nil {
return false, fmt.Errorf("get delas for piece %s: %w", pieceCid, err)
}

if len(deals) == 0 {
return false, fmt.Errorf("no storage deals found for piece %s", pieceCid)
}

// check if we have an unsealed deal for the given piece in any of the unsealed sectors.
for _, deal := range deals {
deal := deal

var isUnsealed bool
// Throttle this path to avoid flooding the storage subsystem.
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
aid, _ := address.IDFromAddress(deal.Proposal.Provider)
if err != nil {
return err
}

// todo ProofType can not be passed, SP processes itself
sector := storage.SectorRef{ID: abi.SectorID{Miner: abi.ActorID(aid), Number: deal.SectorNumber}}
isUnsealed, err = m.gatewayMarketClient.IsUnsealed(ctx, deal.Proposal.Provider, pieceCid,
sector,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize)
if err != nil {
return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorNumber, deal.DealID, err)
}

if isUnsealed {
// send SectorsUnsealPiece task
wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize))
if err != nil {
return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err)
}

redirectUrl := ""
// todo How to deal with the FS, Set the `dist` to be empty, processed offline?
if wps.Type() == piecestorage.FS {

}
if wps.Type() == piecestorage.S3 {
redirectUrl, err = wps.GetRedirectUrl(ctx, wps.GetName())
if err != nil {
return fmt.Errorf("failed to get redirect url for piece storage %s: %w", wps.GetName(), err)
}
}
return m.gatewayMarketClient.SectorsUnsealPiece(
ctx,
deal.Proposal.Provider,
pieceCid,
sector,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize,
redirectUrl,
)
}

return nil
})

if err != nil {
log.Warnf("failed to check/retrieve unsealed sector: %s", err)
continue // move on to the next match.
}

if isUnsealed {
return true, nil
}
}

// we don't have an unsealed sector containing the piece
return false, nil
}

return true, nil
// todo check isunseal from miner
// m.gatewayMarketClient.IsUnsealed()
}

func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) {
Expand Down
2 changes: 1 addition & 1 deletion dagstore/market_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMarket(t *testing.T) {
assert.Nil(t, err)

// todo: mock IMarketEvent
marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false)
marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false, 100)

size, err := marketAPI.GetUnpaddedCARSize(ctx, testResourceId)
assert.Nil(t, err)
Expand Down
9 changes: 8 additions & 1 deletion dagstore/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ const (

// CreateAndStartMarketAPI creates a new MarketAPI adaptor for the dagstore mounts.
func CreateAndStartMarketAPI(ctx metrics.MetricsCtx, lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient) (MarketAPI, error) {
mountApi := NewMarketAPI(ctx, repo, pieceStorage, gatewayMarketClient, r.UseTransient)
mountApi := NewMarketAPI(
ctx,
repo,
pieceStorage,
gatewayMarketClient,
r.UseTransient,
r.MaxConcurrencyStorageCalls,
)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return mountApi.Start(ctx)
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ require (
github.com/filecoin-project/go-fil-markets v1.24.0-v17
github.com/filecoin-project/go-jsonrpc v0.1.5
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-state-types v0.9.8
github.com/filecoin-project/go-state-types v0.9.9
github.com/filecoin-project/go-statemachine v1.0.2
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.9.0
github.com/filecoin-project/venus-auth v1.9.0
github.com/filecoin-project/venus-messager v1.9.0
github.com/filecoin-project/venus-messager v1.9.0-rc1
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef
github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7
github.com/ipfs-force-community/venus-gateway v1.9.0
github.com/ipfs-force-community/venus-gateway v1.9.0-rc1
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.4.0
github.com/ipfs/go-cid v0.2.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ github.com/filecoin-project/go-state-types v0.1.4/go.mod h1:xCA/WfKlC2zcn3fUmDv4
github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.1.8/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.9.8 h1:xkdITiR7h691z1tWOhNCJxHI+cq+Mq7ATkpHQ7f1gu8=
github.com/filecoin-project/go-state-types v0.9.8/go.mod h1:+HCZifUV+e8TlQkgll22Ucuiq8OrVJkK+4Kh4u75iiw=
github.com/filecoin-project/go-state-types v0.9.9 h1:gd7Mo6f9jHHpLahttBE88YeQA77i4GK6W5kFdQDnuME=
github.com/filecoin-project/go-state-types v0.9.9/go.mod h1:+HCZifUV+e8TlQkgll22Ucuiq8OrVJkK+4Kh4u75iiw=
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statemachine v1.0.1/go.mod h1:jZdXXiHa61n4NmgWFG4w8tnqgvZVHYbJ3yW7+y8bF54=
github.com/filecoin-project/go-statemachine v1.0.2-0.20220322104818-27f8fbb86dfd/go.mod h1:jZdXXiHa61n4NmgWFG4w8tnqgvZVHYbJ3yW7+y8bF54=
Expand Down Expand Up @@ -462,8 +462,8 @@ github.com/filecoin-project/venus v1.9.0/go.mod h1:3fBoP8nPxHYGerersp5m0J5hY0wsD
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.9.0 h1:GH0o/jPdF55/U/uLoMzrqR9+DOsMf5oWM/X4UPuyWPA=
github.com/filecoin-project/venus-auth v1.9.0/go.mod h1:Ckj8F/iuSgXnCb9LvH0IiPR7swJZQAhabDOxVycLGWs=
github.com/filecoin-project/venus-messager v1.9.0 h1:owOTGzqlVcB46nROj/c1xBoFqBhFnljmWbXjaAKEXZA=
github.com/filecoin-project/venus-messager v1.9.0/go.mod h1:UgUU95+8G7ffHyXO/WLIl75ZBqj1NU9DjxMo5ZN3U+w=
github.com/filecoin-project/venus-messager v1.9.0-rc1 h1:NpjCXY1IMfUhoNvvocNxIirC+Q8iZsQYB8XXHe7s5mY=
github.com/filecoin-project/venus-messager v1.9.0-rc1/go.mod h1:1v7uhBYR1IsIeCud4a+scDVNgVtjCxS/DVIxnXQiuMo=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ=
github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
Expand Down Expand Up @@ -819,8 +819,8 @@ github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go.
github.com/ipfs-force-community/venus-common-utils v0.0.0-20210924063144-1d3a5b30de87/go.mod h1:RTVEOzM+hkpqmcEWpyLDkx1oGO5r9ZWCgYxG/CsXzJQ=
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 h1:v/1/INcqm3kHLauWQYB63MwWJRWGz+3WEuUPp0jzIl8=
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7/go.mod h1:sSTUXgIu95tPHvgcYhdLuELmgPJWCP/pNMFtsrVtOyA=
github.com/ipfs-force-community/venus-gateway v1.9.0 h1:/Gaax6I4sJLDQSpMdjBArQyHPNEjRqtMTaOOVGyr5Fs=
github.com/ipfs-force-community/venus-gateway v1.9.0/go.mod h1:5cqgQrA4Sv8nrJENG02JLVe+aourp9ouQJeaHHirrOM=
github.com/ipfs-force-community/venus-gateway v1.9.0-rc1 h1:eXBREWs0mkscx+vrwcgCNal+QlNGqazULU5kargzkhU=
github.com/ipfs-force-community/venus-gateway v1.9.0-rc1/go.mod h1:+TXUDejSPrY8aNLJcbAn5e6QU3xxirhxNvHUszGuESY=
github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
Expand Down
4 changes: 2 additions & 2 deletions piecestorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ func (s s3PieceStorage) GetMountReader(ctx context.Context, resourceId string) (

func (s s3PieceStorage) GetRedirectUrl(ctx context.Context, resourceId string) (string, error) {
if has, err := s.Has(ctx, resourceId); err != nil {
return "", fmt.Errorf("check object: %s exist error:%w", s.subdirWrapper(resourceId), err)
return "", fmt.Errorf("check object %s exist error: %w", s.subdirWrapper(resourceId), err)
} else if !has {
return "", fmt.Errorf("object: %s not exists", s.subdirWrapper(resourceId))
return "", fmt.Errorf("object %s not exists", s.subdirWrapper(resourceId))
}

params := &s3.GetObjectInput{
Expand Down
10 changes: 5 additions & 5 deletions piecestorage/storagemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, err
// todo: extract name check logic to a function and check blank in name

for _, fsCfg := range cfg.Fs {
// check if storage already exist in manager and it's name is not empty
// check if storage already exist in manager, and it's name is not empty
if fsCfg.Name == "" {
return nil, fmt.Errorf("fs piece storage name is empty, must set storage name in piece storage config `name=yourname`")
}
Expand All @@ -38,7 +38,7 @@ func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, err
}

for _, s3Cfg := range cfg.S3 {
// check if storage already exist in manager and it's name is not empty
// check if storage already exist in manager, and it's name is not empty
if s3Cfg.Name == "" {
return nil, fmt.Errorf("s3 pieceStorage name is empty, must set storage name in piece storage config `name=yourname`")
}
Expand All @@ -64,7 +64,7 @@ func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string)
_ = p.EachPieceStorage(func(st IPieceStorage) error {
has, err := st.Has(ctx, s)
if err != nil {
log.Warnf("got error while check avaibale in storageg")
log.Warnf("got error while check avaibale in storage: %s", err.Error())
return nil
}
if has {
Expand All @@ -74,7 +74,7 @@ func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string)
})

if len(storages) == 0 {
return nil, fmt.Errorf("unable to find piece in storage %s", s)
return nil, fmt.Errorf("unable to find piece %s in storage", s)
}

return randStorageSelector(storages)
Expand Down Expand Up @@ -125,7 +125,7 @@ func (p *PieceStorageManager) AddPieceStorage(s IPieceStorage) error {
p.lk.Lock()
defer p.lk.Unlock()

// check if storage already exist in manager and it's name is not empty
// check if storage already exist in manager, and it's name is not empty
_, ok := p.storages[s.GetName()]
if ok {
return fmt.Errorf("duplicate storage name: %s", s.GetName())
Expand Down
2 changes: 1 addition & 1 deletion piecestorage/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type IPieceStorage interface {
Len(context.Context, string) (int64, error)
// ListResourceIds get resource ids from piece store
ListResourceIds(ctx context.Context) ([]string, error)
// GetMountReader use direct read if storage have low performance effecitive ReadAt
// GetReaderCloser use direct read if storage have low performance effecitive ReadAt
GetReaderCloser(context.Context, string) (io.ReadCloser, error)
// GetMountReader used to support dagstore
GetMountReader(context.Context, string) (mount.Reader, error)
Expand Down

0 comments on commit ece80a8

Please sign in to comment.