Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unsealed from sp through venus-gateway / 通过venus-gateway给SP下发unsealed请求 #267

Merged
merged 1 commit into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 82 additions & 5 deletions dagstore/market_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"github.com/ipfs/go-cid"

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-padreader"

gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2"
vSharedTypes "github.com/filecoin-project/venus/venus-shared/types"

marketMetrics "github.com/filecoin-project/venus-market/v2/metrics"
"github.com/filecoin-project/venus-market/v2/models/repo"
"github.com/filecoin-project/venus-market/v2/piecestorage"
"github.com/filecoin-project/venus-market/v2/storageprovider"
"github.com/filecoin-project/venus-market/v2/utils"
)

Expand All @@ -35,17 +37,28 @@ type marketAPI struct {
useTransient bool
metricsCtx metrics.MetricsCtx
gatewayMarketClient gatewayAPIV2.IMarketClient

throttle throttle.Throttler
}

var _ MarketAPI = (*marketAPI)(nil)

func NewMarketAPI(ctx metrics.MetricsCtx, repo repo.Repo, pieceStorageMgr *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient, useTransient bool) MarketAPI {
func NewMarketAPI(
ctx metrics.MetricsCtx,
repo repo.Repo,
pieceStorageMgr *piecestorage.PieceStorageManager,
gatewayMarketClient gatewayAPIV2.IMarketClient,
useTransient bool,
concurrency int) MarketAPI {

return &marketAPI{
pieceRepo: repo.StorageDealRepo(),
pieceStorageMgr: pieceStorageMgr,
useTransient: useTransient,
metricsCtx: ctx,
gatewayMarketClient: gatewayMarketClient,

throttle: throttle.Fixed(concurrency),
}
}

Expand All @@ -56,11 +69,75 @@ func (m *marketAPI) Start(_ context.Context) error {
func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
_, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String())
if err != nil {
return false, fmt.Errorf("unable to find storage for piece %s %w", pieceCid, err)
log.Warnf("unable to find storage for piece %s: %s", pieceCid, err)

// check it from the SP through venus-gateway
deals, err := m.pieceRepo.GetDealsByPieceCidAndStatus(ctx, pieceCid, storageprovider.ReadyRetrievalDealStatus...)
if err != nil {
return false, fmt.Errorf("get delas for piece %s: %w", pieceCid, err)
}

if len(deals) == 0 {
return false, fmt.Errorf("no storage deals found for piece %s", pieceCid)
}

// check if we have an unsealed deal for the given piece in any of the unsealed sectors.
for _, deal := range deals {
deal := deal

var isUnsealed bool
// Throttle this path to avoid flooding the storage subsystem.
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
// todo ProofType can not be passed, SP processes itself?
isUnsealed, err = m.gatewayMarketClient.IsUnsealed(ctx, deal.Proposal.Provider, pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize)
if err != nil {
return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorNumber, deal.DealID, err)
}

if isUnsealed {
// send SectorsUnsealPiece task
wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize))
if err != nil {
return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err)
}

pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String())
if err != nil {
return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err)
}

return m.gatewayMarketClient.SectorsUnsealPiece(
ctx,
deal.Proposal.Provider,
pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize,
pieceTransfer,
)
}

return nil
})

if err != nil {
log.Warnf("failed to check/retrieve unsealed sector: %s", err)
continue // move on to the next match.
}

if isUnsealed {
return true, nil
}
}

// we don't have an unsealed sector containing the piece
return false, nil
}

return true, nil
// todo check isunseal from miner
// m.gatewayMarketClient.IsUnsealed()
}

func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) {
Expand Down
2 changes: 1 addition & 1 deletion dagstore/market_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMarket(t *testing.T) {
assert.Nil(t, err)

// todo: mock IMarketEvent
marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false)
marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false, 100)

size, err := marketAPI.GetUnpaddedCARSize(ctx, testResourceId)
assert.Nil(t, err)
Expand Down
9 changes: 8 additions & 1 deletion dagstore/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ const (

// CreateAndStartMarketAPI creates a new MarketAPI adaptor for the dagstore mounts.
func CreateAndStartMarketAPI(ctx metrics.MetricsCtx, lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient) (MarketAPI, error) {
mountApi := NewMarketAPI(ctx, repo, pieceStorage, gatewayMarketClient, r.UseTransient)
mountApi := NewMarketAPI(
ctx,
repo,
pieceStorage,
gatewayMarketClient,
r.UseTransient,
r.MaxConcurrencyStorageCalls,
)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return mountApi.Start(ctx)
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.10.2-0.20230316084941-2180049a244b
github.com/filecoin-project/venus v1.10.2-0.20230320061738-c105ca752826
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e
github.com/filecoin-project/venus-messager v1.10.2-0.20230309071456-7cd8d49c6e9a
github.com/golang/mock v1.6.0
Expand All @@ -40,7 +40,7 @@ require (
github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef
github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230309021833-84fdba83f37d
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356
github.com/ipfs/go-blockservice v0.4.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-cidutil v0.1.0
Expand Down Expand Up @@ -137,7 +137,6 @@ require (
github.com/filecoin-project/specs-actors/v4 v4.0.2 // indirect
github.com/filecoin-project/specs-actors/v5 v5.0.6 // indirect
github.com/filecoin-project/specs-actors/v6 v6.0.2 // indirect
github.com/filecoin-project/specs-storage v0.4.1 // indirect
github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psS
github.com/filecoin-project/go-state-types v0.1.1-0.20210810190654-139e0e79e69e/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-state-types v0.1.1/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-state-types v0.1.3/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-state-types v0.1.4/go.mod h1:xCA/WfKlC2zcn3fUmDv4IrzznwS98X5XW/irUP3Lhxg=
github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.1.8/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
Expand Down Expand Up @@ -457,14 +456,13 @@ github.com/filecoin-project/specs-actors/v7 v7.0.1/go.mod h1:tPLEYXoXhcpyLh69Ccq
github.com/filecoin-project/specs-actors/v8 v8.0.1 h1:4u0tIRJeT5G7F05lwLRIsDnsrN+bJ5Ixj6h49Q7uE2Y=
github.com/filecoin-project/specs-storage v0.2.2/go.mod h1:6cc/lncmAxMUocPi0z1EPCX63beIX7F7UnlmUZ3hLQo=
github.com/filecoin-project/specs-storage v0.4.1 h1:yvLEaLZj8f+uByhNC4mFOtCUyL2wQku+NGBp6hjTe9M=
github.com/filecoin-project/specs-storage v0.4.1/go.mod h1:Z2eK6uMwAOSLjek6+sy0jNV2DSsMEENziMUz0GHRFBw=
github.com/filecoin-project/storetheindex v0.3.5/go.mod h1:0r3d0kSpK63O6AvLr1CjAINLi+nWD49clzcnKV+GLpI=
github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 h1:6GCuzxLVHBzlz7y+FkbHh6n0UyoEGWqDwJKQPJoz7bE=
github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0=
github.com/filecoin-project/venus v1.10.2-0.20230316084941-2180049a244b h1:T0TIlKIFsOvOKDIADN4P7xBC+coJ7YlbUqift11TCIE=
github.com/filecoin-project/venus v1.10.2-0.20230316084941-2180049a244b/go.mod h1:d6XlyUBZd+SHydYimTTkUr3O5rjYOHQTsydI4Nxy6U8=
github.com/filecoin-project/venus v1.10.2-0.20230320061738-c105ca752826 h1:NnZjr537T102iDVdxm8e/5+t7gJmtfSkpay26KQS9LY=
github.com/filecoin-project/venus v1.10.2-0.20230320061738-c105ca752826/go.mod h1:d6XlyUBZd+SHydYimTTkUr3O5rjYOHQTsydI4Nxy6U8=
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e h1:Bxpt1AzPeNxmUnFT2Y8rpabr9x0wIC0Q87DeRmjL2co=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs=
Expand Down Expand Up @@ -827,8 +825,8 @@ github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go.
github.com/ipfs-force-community/venus-common-utils v0.0.0-20210924063144-1d3a5b30de87/go.mod h1:RTVEOzM+hkpqmcEWpyLDkx1oGO5r9ZWCgYxG/CsXzJQ=
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 h1:v/1/INcqm3kHLauWQYB63MwWJRWGz+3WEuUPp0jzIl8=
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7/go.mod h1:sSTUXgIu95tPHvgcYhdLuELmgPJWCP/pNMFtsrVtOyA=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230309021833-84fdba83f37d h1:/ajr1CjW48GP0vwZgONtUOF74nImn6p4dybeGd0UF98=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230309021833-84fdba83f37d/go.mod h1:+TOIuiXWzfS5au9pK5oyU8AzDn/aMJhzSXXlQkVwf5A=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356 h1:j+EdBUhTFZgQBoC+AQuucDlIGpdvRvjWZmtn3GIuMsU=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356/go.mod h1:J2VCU6ANymkl8MjjpHj+y2Wai7EGM1Htd6ImSD5Entw=
github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
Expand Down
38 changes: 29 additions & 9 deletions piecestorage/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@ package piecestorage

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"

"github.com/filecoin-project/dagstore/mount"

"github.com/filecoin-project/venus/pkg/util/fsutil"
"github.com/filecoin-project/venus/venus-shared/types/market"

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/venus-market/v2/config"
"github.com/filecoin-project/venus-market/v2/utils"
"github.com/filecoin-project/venus/pkg/util/fsutil"
)

type fsPieceStorage struct {
baseUrl string
fsCfg *config.FsPieceStorage
}

func (f *fsPieceStorage) Len(ctx context.Context, resourceId string) (int64, error) {
func (f *fsPieceStorage) Len(_ context.Context, resourceId string) (int64, error) {
st, err := os.Stat(path.Join(f.baseUrl, resourceId))
if err != nil {
return 0, err
Expand All @@ -33,7 +35,7 @@ func (f *fsPieceStorage) Len(ctx context.Context, resourceId string) (int64, err
return st.Size(), err
}

func (f *fsPieceStorage) ListResourceIds(ctx context.Context) ([]string, error) {
func (f *fsPieceStorage) ListResourceIds(_ context.Context) ([]string, error) {
entries, err := os.ReadDir(f.baseUrl)
if err != nil {
return nil, err
Expand All @@ -47,7 +49,7 @@ func (f *fsPieceStorage) ListResourceIds(ctx context.Context) ([]string, error)
return resources, nil
}

func (f *fsPieceStorage) SaveTo(ctx context.Context, resourceId string, r io.Reader) (int64, error) {
func (f *fsPieceStorage) SaveTo(_ context.Context, resourceId string, r io.Reader) (int64, error) {
if f.fsCfg.ReadOnly {
return 0, fmt.Errorf("do not write to a 'readonly' piece store")
}
Expand All @@ -67,7 +69,7 @@ func (f *fsPieceStorage) SaveTo(ctx context.Context, resourceId string, r io.Rea
return wlen, err
}

func (f *fsPieceStorage) GetReaderCloser(ctx context.Context, resourceId string) (io.ReadCloser, error) {
func (f *fsPieceStorage) GetReaderCloser(_ context.Context, resourceId string) (io.ReadCloser, error) {
dstPath := path.Join(f.baseUrl, resourceId)
fs, err := os.Open(dstPath)
if err != nil {
Expand All @@ -76,7 +78,7 @@ func (f *fsPieceStorage) GetReaderCloser(ctx context.Context, resourceId string)
return fs, nil
}

func (f *fsPieceStorage) GetMountReader(ctx context.Context, resourceId string) (mount.Reader, error) {
func (f *fsPieceStorage) GetMountReader(_ context.Context, resourceId string) (mount.Reader, error) {
dstPath := path.Join(f.baseUrl, resourceId)
fs, err := os.Open(dstPath)
if err != nil {
Expand All @@ -89,7 +91,25 @@ func (f *fsPieceStorage) GetRedirectUrl(_ context.Context, _ string) (string, er
return "", ErrUnsupportRedirect
}

func (f *fsPieceStorage) Has(ctx context.Context, resourceId string) (bool, error) {
func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (*market.Transfer, error) {
if f.fsCfg.ReadOnly {
return nil, fmt.Errorf("%s id readonly piece store", f.fsCfg.Name)
}

dstPath := path.Join(f.baseUrl, pieceCid)
transfer := market.FsTransfer{Path: dstPath}
params, err := json.Marshal(&transfer)
if err != nil {
return nil, fmt.Errorf("construct piece transfer: %w", err)
}

return &market.Transfer{
Type: market.PiecesTransferFs,
Params: params,
}, nil
}

func (f *fsPieceStorage) Has(_ context.Context, resourceId string) (bool, error) {
_, err := os.Stat(path.Join(f.baseUrl, resourceId))
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -100,7 +120,7 @@ func (f *fsPieceStorage) Has(ctx context.Context, resourceId string) (bool, erro
return true, nil
}

func (f *fsPieceStorage) Validate(resourceId string) error {
func (f *fsPieceStorage) Validate(_ string) error {
st, err := os.Stat(f.baseUrl)
if err != nil {
if os.IsNotExist(err) {
Expand Down
4 changes: 4 additions & 0 deletions piecestorage/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (m *MemPieceStore) GetRedirectUrl(_ context.Context, resourceId string) (st
return "", ErrUnsupportRedirect
}

func (m *MemPieceStore) GetPieceTransfer(context.Context, string) (*market.Transfer, error) {
return &market.Transfer{}, nil
}

func (m *MemPieceStore) Validate(s string) error {
return nil
}
Expand Down
Loading