Skip to content

Commit

Permalink
feat: unsealed from sp through venus-gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
diwufeiwen committed Feb 9, 2023
1 parent 831d33d commit 1d7ce54
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 45 deletions.
87 changes: 82 additions & 5 deletions dagstore/market_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"github.com/ipfs/go-cid"

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-padreader"

gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2"
vSharedTypes "github.com/filecoin-project/venus/venus-shared/types"

marketMetrics "github.com/filecoin-project/venus-market/v2/metrics"
"github.com/filecoin-project/venus-market/v2/models/repo"
"github.com/filecoin-project/venus-market/v2/piecestorage"
"github.com/filecoin-project/venus-market/v2/storageprovider"
"github.com/filecoin-project/venus-market/v2/utils"
)

Expand All @@ -35,17 +37,28 @@ type marketAPI struct {
useTransient bool
metricsCtx metrics.MetricsCtx
gatewayMarketClient gatewayAPIV2.IMarketClient

throttle throttle.Throttler
}

var _ MarketAPI = (*marketAPI)(nil)

func NewMarketAPI(ctx metrics.MetricsCtx, repo repo.Repo, pieceStorageMgr *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient, useTransient bool) MarketAPI {
func NewMarketAPI(
ctx metrics.MetricsCtx,
repo repo.Repo,
pieceStorageMgr *piecestorage.PieceStorageManager,
gatewayMarketClient gatewayAPIV2.IMarketClient,
useTransient bool,
concurrency int) MarketAPI {

return &marketAPI{
pieceRepo: repo.StorageDealRepo(),
pieceStorageMgr: pieceStorageMgr,
useTransient: useTransient,
metricsCtx: ctx,
gatewayMarketClient: gatewayMarketClient,

throttle: throttle.Fixed(concurrency),
}
}

Expand All @@ -56,11 +69,75 @@ func (m *marketAPI) Start(_ context.Context) error {
func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
_, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String())
if err != nil {
return false, fmt.Errorf("unable to find storage for piece %s %w", pieceCid, err)
log.Warnf("unable to find storage for piece %s: %s", pieceCid, err)

// check it from the SP through venus-gateway
deals, err := m.pieceRepo.GetDealsByPieceCidAndStatus(ctx, pieceCid, storageprovider.ReadyRetrievalDealStatus...)
if err != nil {
return false, fmt.Errorf("get delas for piece %s: %w", pieceCid, err)
}

if len(deals) == 0 {
return false, fmt.Errorf("no storage deals found for piece %s", pieceCid)
}

// check if we have an unsealed deal for the given piece in any of the unsealed sectors.
for _, deal := range deals {
deal := deal

var isUnsealed bool
// Throttle this path to avoid flooding the storage subsystem.
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
// todo ProofType can not be passed, SP processes itself?
isUnsealed, err = m.gatewayMarketClient.IsUnsealed(ctx, deal.Proposal.Provider, pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize)
if err != nil {
return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorNumber, deal.DealID, err)
}

if isUnsealed {
// send SectorsUnsealPiece task
wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize))
if err != nil {
return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err)
}

pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String())
if err != nil {
return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err)
}

return m.gatewayMarketClient.SectorsUnsealPiece(
ctx,
deal.Proposal.Provider,
pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize,
pieceTransfer,
)
}

return nil
})

if err != nil {
log.Warnf("failed to check/retrieve unsealed sector: %s", err)
continue // move on to the next match.
}

if isUnsealed {
return true, nil
}
}

// we don't have an unsealed sector containing the piece
return false, nil
}

return true, nil
// todo check isunseal from miner
// m.gatewayMarketClient.IsUnsealed()
}

func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) {
Expand Down
2 changes: 1 addition & 1 deletion dagstore/market_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMarket(t *testing.T) {
assert.Nil(t, err)

// todo: mock IMarketEvent
marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false)
marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false, 100)

size, err := marketAPI.GetUnpaddedCARSize(ctx, testResourceId)
assert.Nil(t, err)
Expand Down
9 changes: 8 additions & 1 deletion dagstore/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ const (

// CreateAndStartMarketAPI creates a new MarketAPI adaptor for the dagstore mounts.
func CreateAndStartMarketAPI(ctx metrics.MetricsCtx, lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient) (MarketAPI, error) {
mountApi := NewMarketAPI(ctx, repo, pieceStorage, gatewayMarketClient, r.UseTransient)
mountApi := NewMarketAPI(
ctx,
repo,
pieceStorage,
gatewayMarketClient,
r.UseTransient,
r.MaxConcurrencyStorageCalls,
)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return mountApi.Start(ctx)
Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ require (
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.9.0-rc1.0.20230203064217-14f0513a243e
github.com/filecoin-project/specs-storage v0.4.1
github.com/filecoin-project/venus v1.9.0-rc1.0.20230209064351-61e06b8147ce
github.com/filecoin-project/venus-auth v1.9.0
github.com/filecoin-project/venus-messager v1.9.0-rc1
github.com/golang/mock v1.6.0
Expand All @@ -39,7 +40,7 @@ require (
github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef
github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7
github.com/ipfs-force-community/venus-gateway v1.9.0-rc1
github.com/ipfs-force-community/venus-gateway v1.9.0-rc1.0.20230209065811-00d8c231d59c
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.4.0
github.com/ipfs/go-cid v0.2.0
Expand Down Expand Up @@ -135,7 +136,6 @@ require (
github.com/filecoin-project/specs-actors/v4 v4.0.2 // indirect
github.com/filecoin-project/specs-actors/v5 v5.0.6 // indirect
github.com/filecoin-project/specs-actors/v6 v6.0.2 // indirect
github.com/filecoin-project/specs-storage v0.4.1 // indirect
github.com/filecoin-project/storetheindex v0.4.17 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
Expand Down Expand Up @@ -207,7 +207,6 @@ require (
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-core v0.20.0 // indirect
github.com/libp2p/go-libp2p-peerstore v0.8.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.8.0 // indirect
github.com/libp2p/go-msgio v0.2.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
Expand Down
11 changes: 7 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,10 @@ github.com/filecoin-project/storetheindex v0.4.17 h1:w0dVc954TGPukoVbidlYvn9Xt+w
github.com/filecoin-project/storetheindex v0.4.17/go.mod h1:y2dL8C5D3PXi183hdxgGtM8vVYOZ1lg515tpl/D3tN8=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0=
github.com/filecoin-project/venus v1.9.0-rc1.0.20230203064217-14f0513a243e h1:l2EhKlBboVUij2CTf0iHtQvrCmOn52qTbtJAvjDjxss=
github.com/filecoin-project/venus v1.9.0-rc1.0.20230203064217-14f0513a243e/go.mod h1:TIRaBucKJIIe0QxuSWsRkO6fpRdKWdYY990KjQ/Fa38=
github.com/filecoin-project/venus v1.9.0-rc1.0.20230208094740-fb64fb68804c h1:ddyiPXJlMvfXywLDhuOjGzpbFyDJtjuEKpseHPHaasI=
github.com/filecoin-project/venus v1.9.0-rc1.0.20230208094740-fb64fb68804c/go.mod h1:TIRaBucKJIIe0QxuSWsRkO6fpRdKWdYY990KjQ/Fa38=
github.com/filecoin-project/venus v1.9.0-rc1.0.20230209064351-61e06b8147ce h1:9oWTMmlz4MDAfWA22Pq7TsBidA1Ecl9f6nKt9KgriNo=
github.com/filecoin-project/venus v1.9.0-rc1.0.20230209064351-61e06b8147ce/go.mod h1:TIRaBucKJIIe0QxuSWsRkO6fpRdKWdYY990KjQ/Fa38=
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.9.0 h1:GH0o/jPdF55/U/uLoMzrqR9+DOsMf5oWM/X4UPuyWPA=
github.com/filecoin-project/venus-auth v1.9.0/go.mod h1:Ckj8F/iuSgXnCb9LvH0IiPR7swJZQAhabDOxVycLGWs=
Expand Down Expand Up @@ -821,6 +823,8 @@ github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6b
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7/go.mod h1:sSTUXgIu95tPHvgcYhdLuELmgPJWCP/pNMFtsrVtOyA=
github.com/ipfs-force-community/venus-gateway v1.9.0-rc1 h1:eXBREWs0mkscx+vrwcgCNal+QlNGqazULU5kargzkhU=
github.com/ipfs-force-community/venus-gateway v1.9.0-rc1/go.mod h1:+TXUDejSPrY8aNLJcbAn5e6QU3xxirhxNvHUszGuESY=
github.com/ipfs-force-community/venus-gateway v1.9.0-rc1.0.20230209065811-00d8c231d59c h1:dBI04AQ/y+XrOWkNhOCZUfvk1mMk+xbbVH/XGmwA88c=
github.com/ipfs-force-community/venus-gateway v1.9.0-rc1.0.20230209065811-00d8c231d59c/go.mod h1:ubypIpKeBtVOyW0stHYykBpIh+4vub8l8uAbgxP8710=
github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
Expand Down Expand Up @@ -1349,8 +1353,7 @@ github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuD
github.com/libp2p/go-libp2p-peerstore v0.2.8/go.mod h1:gGiPlXdz7mIHd2vfAsHzBNAMqSDkt2UBFwgcITgw1lA=
github.com/libp2p/go-libp2p-peerstore v0.4.0/go.mod h1:rDJUFyzEWPpXpEwywkcTYYzDHlwza8riYMaUzaN6hX0=
github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc=
github.com/libp2p/go-libp2p-peerstore v0.8.0 h1:bzTG693TA1Ju/zKmUCQzDLSqiJnyRFVwPpuloZ/OZtI=
github.com/libp2p/go-libp2p-peerstore v0.8.0/go.mod h1:9geHWmNA3YDlQBjL/uPEJD6vpDK12aDNlUNHJ6kio/s=
github.com/libp2p/go-libp2p-peerstore v0.7.1 h1:7FpALlqR+3+oOBXdzm3AVt0vjMYLW1b7jM03E4iEHlw=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1VZNHYcK8cLgFJLZ4s=
github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk=
Expand Down
38 changes: 29 additions & 9 deletions piecestorage/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@ package piecestorage

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"

"github.com/filecoin-project/dagstore/mount"

"github.com/filecoin-project/venus/pkg/util/fsutil"
"github.com/filecoin-project/venus/venus-shared/types/market"

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/venus-market/v2/config"
"github.com/filecoin-project/venus-market/v2/utils"
"github.com/filecoin-project/venus/pkg/util/fsutil"
)

type fsPieceStorage struct {
baseUrl string
fsCfg *config.FsPieceStorage
}

func (f *fsPieceStorage) Len(ctx context.Context, resourceId string) (int64, error) {
func (f *fsPieceStorage) Len(_ context.Context, resourceId string) (int64, error) {
st, err := os.Stat(path.Join(f.baseUrl, resourceId))
if err != nil {
return 0, err
Expand All @@ -33,7 +35,7 @@ func (f *fsPieceStorage) Len(ctx context.Context, resourceId string) (int64, err
return st.Size(), err
}

func (f *fsPieceStorage) ListResourceIds(ctx context.Context) ([]string, error) {
func (f *fsPieceStorage) ListResourceIds(_ context.Context) ([]string, error) {
entries, err := os.ReadDir(f.baseUrl)
if err != nil {
return nil, err
Expand All @@ -47,7 +49,7 @@ func (f *fsPieceStorage) ListResourceIds(ctx context.Context) ([]string, error)
return resources, nil
}

func (f *fsPieceStorage) SaveTo(ctx context.Context, resourceId string, r io.Reader) (int64, error) {
func (f *fsPieceStorage) SaveTo(_ context.Context, resourceId string, r io.Reader) (int64, error) {
if f.fsCfg.ReadOnly {
return 0, fmt.Errorf("do not write to a 'readonly' piece store")
}
Expand All @@ -67,7 +69,7 @@ func (f *fsPieceStorage) SaveTo(ctx context.Context, resourceId string, r io.Rea
return wlen, err
}

func (f *fsPieceStorage) GetReaderCloser(ctx context.Context, resourceId string) (io.ReadCloser, error) {
func (f *fsPieceStorage) GetReaderCloser(_ context.Context, resourceId string) (io.ReadCloser, error) {
dstPath := path.Join(f.baseUrl, resourceId)
fs, err := os.Open(dstPath)
if err != nil {
Expand All @@ -76,7 +78,7 @@ func (f *fsPieceStorage) GetReaderCloser(ctx context.Context, resourceId string)
return fs, nil
}

func (f *fsPieceStorage) GetMountReader(ctx context.Context, resourceId string) (mount.Reader, error) {
func (f *fsPieceStorage) GetMountReader(_ context.Context, resourceId string) (mount.Reader, error) {
dstPath := path.Join(f.baseUrl, resourceId)
fs, err := os.Open(dstPath)
if err != nil {
Expand All @@ -89,7 +91,25 @@ func (f *fsPieceStorage) GetRedirectUrl(_ context.Context, _ string) (string, er
return "", ErrUnsupportRedirect
}

func (f *fsPieceStorage) Has(ctx context.Context, resourceId string) (bool, error) {
func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (*market.Transfer, error) {
if f.fsCfg.ReadOnly {
return nil, fmt.Errorf("%s id readonly piece store", f.fsCfg.Name)
}

dstPath := path.Join(f.baseUrl, pieceCid)
transfer := market.FsTransfer{Path: dstPath}
params, err := json.Marshal(&transfer)
if err != nil {
return nil, fmt.Errorf("construct piece transfer: %w", err)
}

return &market.Transfer{
Type: market.PiecesTransferFs,
Params: params,
}, nil
}

func (f *fsPieceStorage) Has(_ context.Context, resourceId string) (bool, error) {
_, err := os.Stat(path.Join(f.baseUrl, resourceId))
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -100,7 +120,7 @@ func (f *fsPieceStorage) Has(ctx context.Context, resourceId string) (bool, erro
return true, nil
}

func (f *fsPieceStorage) Validate(resourceId string) error {
func (f *fsPieceStorage) Validate(_ string) error {
st, err := os.Stat(f.baseUrl)
if err != nil {
if os.IsNotExist(err) {
Expand Down
4 changes: 4 additions & 0 deletions piecestorage/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (m *MemPieceStore) GetRedirectUrl(_ context.Context, resourceId string) (st
return "", ErrUnsupportRedirect
}

func (m *MemPieceStore) GetPieceTransfer(context.Context, string) (*market.Transfer, error) {
return &market.Transfer{}, nil
}

func (m *MemPieceStore) Validate(s string) error {
return nil
}
Expand Down
Loading

0 comments on commit 1d7ce54

Please sign in to comment.