Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/support multi piece store #115

Merged
merged 5 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions api/impl/venus_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ import (
"sort"
"time"

"github.com/filecoin-project/venus-market/models/repo"

"github.com/filecoin-project/venus-market/minermgr"
"github.com/filecoin-project/venus-market/retrievalprovider"
"github.com/filecoin-project/venus/venus-shared/actors/builtin/paych"

"github.com/filecoin-project/venus-market/config"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
Expand All @@ -33,17 +26,21 @@ import (
"github.com/filecoin-project/go-state-types/abi"

clients2 "github.com/filecoin-project/venus-market/api/clients"
"github.com/filecoin-project/venus-market/config"
"github.com/filecoin-project/venus-market/minermgr"
"github.com/filecoin-project/venus-market/models/repo"
"github.com/filecoin-project/venus-market/network"
"github.com/filecoin-project/venus-market/paychmgr"
"github.com/filecoin-project/venus-market/piecestorage"
"github.com/filecoin-project/venus-market/retrievalprovider"
"github.com/filecoin-project/venus-market/storageprovider"
marketapi "github.com/filecoin-project/venus/venus-shared/api/market"
types "github.com/filecoin-project/venus/venus-shared/types/market"

"github.com/filecoin-project/venus-market/paychmgr"

"github.com/filecoin-project/venus/pkg/constants"
"github.com/filecoin-project/venus/venus-shared/actors/builtin/paych"
v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1"
marketapi "github.com/filecoin-project/venus/venus-shared/api/market"
vTypes "github.com/filecoin-project/venus/venus-shared/types"
types "github.com/filecoin-project/venus/venus-shared/types/market"
)

var _ marketapi.IMarket = (*MarketNodeImpl)(nil)
Expand Down Expand Up @@ -174,12 +171,12 @@ func (m MarketNodeImpl) MarketListIncompleteDeals(ctx context.Context, mAddr add
if mAddr == address.Undef {
deals, err = m.Repo.StorageDealRepo().ListDeal(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("get deal: %s", err)
}
} else {
deals, err = m.Repo.StorageDealRepo().ListDealByAddr(ctx, mAddr)
if err != nil {
return nil, err
return nil, fmt.Errorf("get deal for %s: %s", mAddr.String(), err)
}
}

Expand Down
17 changes: 14 additions & 3 deletions cmd/venus-market/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"os"
"strings"

"github.com/filecoin-project/venus-market/version"
logging "github.com/ipfs/go-log/v2"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

Expand All @@ -19,6 +17,7 @@ import (
"github.com/filecoin-project/venus-market/config"
_ "github.com/filecoin-project/venus-market/network"
"github.com/filecoin-project/venus-market/piecestorage"
"github.com/filecoin-project/venus-market/version"

_ "github.com/filecoin-project/venus/pkg/crypto/bls"
_ "github.com/filecoin-project/venus/pkg/crypto/secp"
Expand Down Expand Up @@ -111,9 +110,14 @@ var (
Usage: "auth token for connect wallet service",
}

ExternalFsPieceStorageFlag = &cli.StringSliceFlag{
Name: "ex-fs-ps",
Usage: "config external file system storage for piece (eg /mnt/store/f01000}",
}

PieceStorageFlag = &cli.StringFlag{
Name: "piecestorage",
Usage: "config storage for piece (eg fs:/mnt/piece s3:{access key}:{secret key}:{option token}@{region}host/{bucket}",
Usage: "config storage for piece (eg fs:/mnt/piece s3:{access key}:{secret key}:{option token}@{region}host/{bucket})",
}

MysqlDsnFlag = &cli.StringFlag{
Expand Down Expand Up @@ -253,6 +257,12 @@ func flagData(cctx *cli.Context, cfg *config.MarketConfig) error {
cfg.PieceStorage = pieceStorage
}

if cctx.IsSet(ExternalFsPieceStorageFlag.Name) {
cfg.ExternalFsPieceStore.Paths = make([]string, 0)
paths := cctx.StringSlice(ExternalFsPieceStorageFlag.Name)
cfg.ExternalFsPieceStore.Paths = append(cfg.ExternalFsPieceStore.Paths, paths...)
}

if cctx.IsSet(MysqlDsnFlag.Name) {
cfg.Mysql.ConnectionString = cctx.String(MysqlDsnFlag.Name)
}
Expand All @@ -269,6 +279,7 @@ func flagData(cctx *cli.Context, cfg *config.MarketConfig) error {
if len(addrStr) >= 2 {
account = addrStr[1]
}
// todo 这里是追加不是替换
cfg.StorageMiners = append(cfg.StorageMiners, config.User{
Addr: config.Address(addr),
Account: account,
Expand Down
2 changes: 2 additions & 0 deletions cmd/venus-market/pool-run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/filecoin-project/venus-market/storageprovider"
types2 "github.com/filecoin-project/venus-market/types"
"github.com/filecoin-project/venus-market/utils"

marketapi "github.com/filecoin-project/venus/venus-shared/api/market"
"github.com/filecoin-project/venus/venus-shared/api/permission"
)
Expand All @@ -49,6 +50,7 @@ var poolRunCmd = &cli.Command{
GatewayUrlFlag,
GatewayTokenFlag,
PieceStorageFlag,
ExternalFsPieceStorageFlag,
MysqlDsnFlag,
MinerListFlag,
PaymentAddressFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/venus-market/solo-run.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var soloRunCmd = &cli.Command{
WalletUrlFlag,
WalletTokenFlag,
PieceStorageFlag,
ExternalFsPieceStorageFlag,
MysqlDsnFlag,
MinerListFlag,
PaymentAddressFlag,
Expand Down
15 changes: 11 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ type S3PieceStorage struct {
Token string
}

// todo 用作检索,不需要显式标注 ReadOnly
type ExternalFsPieceStore struct {
Paths []string
}

type User struct {
Addr Address
Account string
Expand All @@ -191,10 +196,12 @@ type MarketConfig struct {

Mysql Mysql

PieceStorage PieceStorage
Journal Journal
AddressConfig AddressConfig
DAGStore DAGStoreConfig
PieceStorage PieceStorage
// `Pieces` stored when offline deals or not connected to the chain service
ExternalFsPieceStore ExternalFsPieceStore
Journal Journal
AddressConfig AddressConfig
DAGStore DAGStoreConfig

StorageMiners []User
RetrievalPaymentAddress User
Expand Down
13 changes: 9 additions & 4 deletions config/def_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@ var DefaultMarketConfig = &MarketConfig{
GCInterval: Duration(1 * time.Minute),
},
Journal: Journal{Path: "journal"},
PieceStorage: PieceStorage{Fs: FsPieceStorage{
Enable: true,
Path: "/mnt/piece",
}},
PieceStorage: PieceStorage{
Fs: FsPieceStorage{
Enable: true,
Path: "/mnt/piece",
},
},
ExternalFsPieceStore: ExternalFsPieceStore{
Paths: []string{},
},
ConsiderOnlineStorageDeals: true,
ConsiderOfflineStorageDeals: true,
ConsiderOnlineRetrievalDeals: true,
Expand Down
1 change: 1 addition & 0 deletions config/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ var ConfigServerOpts = func(cfg *MarketConfig) builder.Option {
builder.Override(new(*Mysql), &cfg.Mysql),
builder.Override(new(*Libp2p), &cfg.Libp2p),
builder.Override(new(*PieceStorage), &cfg.PieceStorage),
builder.Override(new(*ExternalFsPieceStore), &cfg.ExternalFsPieceStore),
builder.Override(new(*DAGStoreConfig), &cfg.DAGStore),

// Config (todo: get a real property system)
Expand Down
56 changes: 43 additions & 13 deletions dagstore/market_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"io"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-padreader"

"github.com/filecoin-project/venus-market/models/repo"
"github.com/filecoin-project/venus-market/piecestorage"

"github.com/filecoin-project/dagstore/throttle"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/venus-market/piecestorage/external"
)

type MarketAPI interface {
Expand All @@ -21,18 +23,20 @@ type MarketAPI interface {
}

type marketAPI struct {
pieceStorage piecestorage.IPieceStorage
pieceRepo repo.StorageDealRepo
throttle throttle.Throttler
pieceStorage piecestorage.IPieceStorage
exFsPieceStorage external.IExternalPieceStorage
pieceRepo repo.StorageDealRepo
throttle throttle.Throttler
}

var _ MarketAPI = (*marketAPI)(nil)

func NewMinerAPI(repo repo.Repo, pieceStorage piecestorage.IPieceStorage, concurrency int) MarketAPI {
func NewMinerAPI(repo repo.Repo, pieceStorage piecestorage.IPieceStorage, exFsPieceStorage external.IExternalPieceStorage, concurrency int) MarketAPI {
return &marketAPI{
pieceRepo: repo.StorageDealRepo(),
pieceStorage: pieceStorage,
throttle: throttle.Fixed(concurrency),
pieceRepo: repo.StorageDealRepo(),
pieceStorage: pieceStorage,
exFsPieceStorage: exFsPieceStorage,
throttle: throttle.Fixed(concurrency),
}
}

Expand All @@ -41,7 +45,16 @@ func (m *marketAPI) Start(_ context.Context) error {
}

func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
return m.pieceStorage.Has(ctx, pieceCid.String())
bUnsealed, err := m.pieceStorage.Has(ctx, pieceCid.String())
if err != nil {
return bUnsealed, err
}

if bUnsealed {
return bUnsealed, nil
}

return m.exFsPieceStorage.Has(ctx, pieceCid.String())
}

func (m *marketAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
Expand All @@ -66,7 +79,24 @@ func (m *marketAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (i
return iocloser{r, padR}, nil
}

// todo unseal ask miner who have this data, send unseal cmd, and read and pay after receive data
has, err = m.exFsPieceStorage.Has(ctx, pieceCid.String())
if err != nil {
return nil, err
}
if has {
r, err := m.exFsPieceStorage.Read(ctx, pieceCid.String())
if err != nil {
return nil, err
}

padR, err := padreader.NewInflator(r, payloadSize, pieceSize.Unpadded())
if err != nil {
return nil, err
}
return iocloser{r, padR}, nil
}

// todo unseal: ask miner who have this data, send unseal cmd, and read and pay after receive data
// 1. select miner
// 2. send unseal request
// 3. receive data and return
Expand Down
11 changes: 7 additions & 4 deletions dagstore/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import (
"path/filepath"
"strconv"

"go.uber.org/fx"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/go-fil-markets/stores"

"github.com/ipfs-force-community/venus-common-utils/builder"
"go.uber.org/fx"
"golang.org/x/xerrors"

"github.com/filecoin-project/venus-market/config"
"github.com/filecoin-project/venus-market/models/repo"
"github.com/filecoin-project/venus-market/piecestorage"
"github.com/filecoin-project/venus-market/piecestorage/external"
)

var (
Expand All @@ -27,8 +30,8 @@ const (
)

// NewMinerAPI creates a new MarketAPI adaptor for the dagstore mounts.
func NewMarketAPI(lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage piecestorage.IPieceStorage) (MarketAPI, error) {
mountApi := NewMinerAPI(repo, pieceStorage, r.MaxConcurrencyStorageCalls)
func NewMarketAPI(lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage piecestorage.IPieceStorage, exFsPieceStorage external.IExternalPieceStorage) (MarketAPI, error) {
mountApi := NewMinerAPI(repo, pieceStorage, exFsPieceStorage, r.MaxConcurrencyStorageCalls)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return mountApi.Start(ctx)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/filecoin-project/go-state-types v0.1.3
github.com/filecoin-project/go-statemachine v1.0.1
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1
github.com/filecoin-project/specs-storage v0.2.0
github.com/filecoin-project/venus v1.2.3-0.20220316080815-811c837d0e10
Expand Down Expand Up @@ -75,7 +76,6 @@ require (
github.com/libp2p/go-libp2p-yamux v0.7.0
github.com/libp2p/go-maddr-filter v0.1.0
github.com/mitchellh/go-homedir v1.1.0
github.com/modern-go/reflect2 v1.0.2
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.4.1
github.com/multiformats/go-multibase v0.0.3
Expand Down
7 changes: 4 additions & 3 deletions models/mysql/retrieval_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mysql

import (
"context"
"fmt"
"time"

datatransfer "github.com/filecoin-project/go-data-transfer"
Expand Down Expand Up @@ -120,7 +121,7 @@ func toProviderDealState(deal *retrievalDeal) (*types.ProviderDealState, error)
if len(deal.Receiver) > 0 {
newdeal.Receiver, err = decodePeerId(deal.Receiver)
if err != nil {
return nil, err
return nil, fmt.Errorf("decode receiver: %s", err)
}
}

Expand All @@ -129,11 +130,11 @@ func toProviderDealState(deal *retrievalDeal) (*types.ProviderDealState, error)
newdeal.ChannelID.ID = datatransfer.TransferID(deal.ChannelID.ID)
newdeal.ChannelID.Initiator, err = decodePeerId(deal.ChannelID.Initiator)
if err != nil {
return nil, err
return nil, fmt.Errorf("decode ci_initiator: %s", err)
}
newdeal.ChannelID.Responder, err = decodePeerId(deal.ChannelID.Responder)
if err != nil {
return nil, err
return nil, fmt.Errorf("decode ci_responder: %s", err)
}
}
return newdeal, nil
Expand Down
10 changes: 6 additions & 4 deletions models/mysql/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,21 +253,23 @@ func toStorageDeal(src *storageDeal) (*types.MinerDeal, error) {
md.TransferChannelID.ID = datatransfer.TransferID(src.TransferChannelId.ID)
md.TransferChannelID.Initiator, err = decodePeerId(src.TransferChannelId.Initiator)
if err != nil {
return nil, err
return nil, fmt.Errorf("decode tci_initiator: %s", err)
}
md.TransferChannelID.Responder, err = decodePeerId(src.TransferChannelId.Responder)
if err != nil {
return nil, err
return nil, fmt.Errorf("decode tci_responder: %s", err)
}
}

// todo 导入的数据没有此字段
md.Miner, err = decodePeerId(src.Miner)
if err != nil {
return nil, err
return nil, fmt.Errorf("decode miner_peer: %s", err)
}

md.Client, err = decodePeerId(src.Client)
if err != nil {
return nil, err
return nil, fmt.Errorf("decode client_peer: %s", err)
}

return md, nil
Expand Down
Loading