From 67c9f131c19e03f6d658f3853d9823847748b8e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=80=E9=A1=B5=E7=B4=A0=E4=B9=A6?= <2931107265@qq.com> Date: Wed, 16 Mar 2022 15:04:04 +0800 Subject: [PATCH 1/5] PieceStore: support multi mount path --- cmd/venus-market/main.go | 19 ++++- config/config.go | 15 +++- config/def_config.go | 13 ++- config/modules.go | 1 + dagstore/market_api.go | 53 +++++++++--- dagstore/modules.go | 11 ++- piecestorage/client_s3.go | 2 +- piecestorage/external/filestore.go | 131 +++++++++++++++++++++++++++++ piecestorage/filestore.go | 11 +-- piecestorage/modules.go | 14 ++- piecestorage/protocol_test.go | 2 +- piecestorage/s3.go | 12 +-- retrievalprovider/modules.go | 2 - storageprovider/deal_assigner.go | 12 +-- 14 files changed, 244 insertions(+), 54 deletions(-) create mode 100644 piecestorage/external/filestore.go diff --git a/cmd/venus-market/main.go b/cmd/venus-market/main.go index de8f2534..cd0bf3cb 100644 --- a/cmd/venus-market/main.go +++ b/cmd/venus-market/main.go @@ -5,9 +5,7 @@ import ( "os" "strings" - "github.com/filecoin-project/venus-market/version" logging "github.com/ipfs/go-log/v2" - "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -19,6 +17,7 @@ import ( "github.com/filecoin-project/venus-market/config" _ "github.com/filecoin-project/venus-market/network" "github.com/filecoin-project/venus-market/piecestorage" + "github.com/filecoin-project/venus-market/version" _ "github.com/filecoin-project/venus/pkg/crypto/bls" _ "github.com/filecoin-project/venus/pkg/crypto/secp" @@ -111,9 +110,14 @@ var ( Usage: "auth token for connect wallet service", } + ExternalFsPieceStorageFlag = &cli.StringFlag{ + Name: "ex-fs-ps", + Usage: "config external file system storage for piece (eg /mnt/store/f01000}", + } + PieceStorageFlag = &cli.StringFlag{ Name: "piecestorage", - Usage: "config storage for piece (eg fs:/mnt/piece s3:{access key}:{secret key}:{option token}@{region}host/{bucket}", + Usage: "config storage for piece (eg fs:/mnt/piece s3:{access key}:{secret key}:{option token}@{region}host/{bucket})", } MysqlDsnFlag = &cli.StringFlag{ @@ -253,6 +257,14 @@ func flagData(cctx *cli.Context, cfg *config.MarketConfig) error { cfg.PieceStorage = pieceStorage } + if cctx.IsSet(ExternalFsPieceStorageFlag.Name) { + cfg.ExternalFsPieceStore.Paths = make([]string, 0) + paths := cctx.StringSlice(ExternalFsPieceStorageFlag.Name) + for _, path := range paths { + cfg.ExternalFsPieceStore.Paths = append(cfg.ExternalFsPieceStore.Paths, path) + } + } + if cctx.IsSet(MysqlDsnFlag.Name) { cfg.Mysql.ConnectionString = cctx.String(MysqlDsnFlag.Name) } @@ -269,6 +281,7 @@ func flagData(cctx *cli.Context, cfg *config.MarketConfig) error { if len(addrStr) >= 2 { account = addrStr[1] } + // todo 这里是追加不是替换 cfg.StorageMiners = append(cfg.StorageMiners, config.User{ Addr: config.Address(addr), Account: account, diff --git a/config/config.go b/config/config.go index 5b21f1a6..b291f111 100644 --- a/config/config.go +++ b/config/config.go @@ -173,6 +173,11 @@ type S3PieceStorage struct { Token string } +// todo 用作检索,不需要显式标注 ReadOnly +type ExternalFsPieceStore struct { + Paths []string +} + type User struct { Addr Address Account string @@ -191,10 +196,12 @@ type MarketConfig struct { Mysql Mysql - PieceStorage PieceStorage - Journal Journal - AddressConfig AddressConfig - DAGStore DAGStoreConfig + PieceStorage PieceStorage + // `Pieces` stored when offline deals or not connected to the chain service + ExternalFsPieceStore ExternalFsPieceStore + Journal Journal + AddressConfig AddressConfig + DAGStore DAGStoreConfig StorageMiners []User RetrievalPaymentAddress User diff --git a/config/def_config.go b/config/def_config.go index 94627919..d2f34361 100644 --- a/config/def_config.go +++ b/config/def_config.go @@ -59,10 +59,15 @@ var DefaultMarketConfig = &MarketConfig{ GCInterval: Duration(1 * time.Minute), }, Journal: Journal{Path: "journal"}, - PieceStorage: PieceStorage{Fs: FsPieceStorage{ - Enable: true, - Path: "/mnt/piece", - }}, + PieceStorage: PieceStorage{ + Fs: FsPieceStorage{ + Enable: true, + Path: "/mnt/piece", + }, + }, + ExternalFsPieceStore: ExternalFsPieceStore{ + Paths: []string{}, + }, ConsiderOnlineStorageDeals: true, ConsiderOfflineStorageDeals: true, ConsiderOnlineRetrievalDeals: true, diff --git a/config/modules.go b/config/modules.go index d50d0fcc..916357c6 100644 --- a/config/modules.go +++ b/config/modules.go @@ -136,6 +136,7 @@ var ConfigServerOpts = func(cfg *MarketConfig) builder.Option { builder.Override(new(*Mysql), &cfg.Mysql), builder.Override(new(*Libp2p), &cfg.Libp2p), builder.Override(new(*PieceStorage), &cfg.PieceStorage), + builder.Override(new(*ExternalFsPieceStore), &cfg.ExternalFsPieceStore), builder.Override(new(*DAGStoreConfig), &cfg.DAGStore), // Config (todo: get a real property system) diff --git a/dagstore/market_api.go b/dagstore/market_api.go index a8a10f7f..0c8f65f2 100644 --- a/dagstore/market_api.go +++ b/dagstore/market_api.go @@ -4,13 +4,15 @@ import ( "context" "io" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/dagstore/throttle" "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/venus-market/models/repo" "github.com/filecoin-project/venus-market/piecestorage" - - "github.com/filecoin-project/dagstore/throttle" - "github.com/ipfs/go-cid" - "golang.org/x/xerrors" + "github.com/filecoin-project/venus-market/piecestorage/external" ) type MarketAPI interface { @@ -21,18 +23,20 @@ type MarketAPI interface { } type marketAPI struct { - pieceStorage piecestorage.IPieceStorage - pieceRepo repo.StorageDealRepo - throttle throttle.Throttler + pieceStorage piecestorage.IPieceStorage + exFsPieceStorage external.IExternalPieceStorage + pieceRepo repo.StorageDealRepo + throttle throttle.Throttler } var _ MarketAPI = (*marketAPI)(nil) -func NewMinerAPI(repo repo.Repo, pieceStorage piecestorage.IPieceStorage, concurrency int) MarketAPI { +func NewMinerAPI(repo repo.Repo, pieceStorage piecestorage.IPieceStorage, exFsPieceStorage external.IExternalPieceStorage, concurrency int) MarketAPI { return &marketAPI{ - pieceRepo: repo.StorageDealRepo(), - pieceStorage: pieceStorage, - throttle: throttle.Fixed(concurrency), + pieceRepo: repo.StorageDealRepo(), + pieceStorage: pieceStorage, + exFsPieceStorage: exFsPieceStorage, + throttle: throttle.Fixed(concurrency), } } @@ -41,7 +45,16 @@ func (m *marketAPI) Start(_ context.Context) error { } func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { - return m.pieceStorage.Has(ctx, pieceCid.String()) + bUnsealed, err := m.pieceStorage.Has(ctx, pieceCid.String()) + if err != nil { + return bUnsealed, err + } + + if bUnsealed { + return bUnsealed, nil + } + + return m.exFsPieceStorage.Has(ctx, pieceCid.String()) } func (m *marketAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { @@ -64,9 +77,23 @@ func (m *marketAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (i return nil, err } return iocloser{r, padR}, nil + } else { + if has, _ = m.exFsPieceStorage.Has(ctx, pieceCid.String()); has { + r, err := m.exFsPieceStorage.Read(ctx, pieceCid.String()) + if err != nil { + return nil, err + } + + padR, err := padreader.NewInflator(r, uint64(payloadSize), pieceSize.Unpadded()) + if err != nil { + return nil, err + } + + return iocloser{r, padR}, nil + } } - // todo unseal ask miner who have this data, send unseal cmd, and read and pay after receive data + // todo unseal: ask miner who have this data, send unseal cmd, and read and pay after receive data // 1. select miner // 2. send unseal request // 3. receive data and return diff --git a/dagstore/modules.go b/dagstore/modules.go index e2f1b2b4..a74c114f 100644 --- a/dagstore/modules.go +++ b/dagstore/modules.go @@ -6,15 +6,18 @@ import ( "path/filepath" "strconv" + "go.uber.org/fx" + "golang.org/x/xerrors" + "github.com/filecoin-project/dagstore" "github.com/filecoin-project/go-fil-markets/stores" + "github.com/ipfs-force-community/venus-common-utils/builder" - "go.uber.org/fx" - "golang.org/x/xerrors" "github.com/filecoin-project/venus-market/config" "github.com/filecoin-project/venus-market/models/repo" "github.com/filecoin-project/venus-market/piecestorage" + "github.com/filecoin-project/venus-market/piecestorage/external" ) var ( @@ -27,8 +30,8 @@ const ( ) // NewMinerAPI creates a new MarketAPI adaptor for the dagstore mounts. -func NewMarketAPI(lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage piecestorage.IPieceStorage) (MarketAPI, error) { - mountApi := NewMinerAPI(repo, pieceStorage, r.MaxConcurrencyStorageCalls) +func NewMarketAPI(lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage piecestorage.IPieceStorage, exFsPieceStorage external.IExternalPieceStorage) (MarketAPI, error) { + mountApi := NewMinerAPI(repo, pieceStorage, exFsPieceStorage, r.MaxConcurrencyStorageCalls) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { return mountApi.Start(ctx) diff --git a/piecestorage/client_s3.go b/piecestorage/client_s3.go index 2001db6f..c540526f 100644 --- a/piecestorage/client_s3.go +++ b/piecestorage/client_s3.go @@ -78,7 +78,7 @@ func (c PresignS3Storage) Has(ctx context.Context, s string) (bool, error) { panic("implement me") } -func (c PresignS3Storage) Validate(s string) error { +func (c PresignS3Storage) Validate() error { if c.presignUrl == nil { return fmt.Errorf("client s3 storage must has presign url") } diff --git a/piecestorage/external/filestore.go b/piecestorage/external/filestore.go new file mode 100644 index 00000000..dd9f297c --- /dev/null +++ b/piecestorage/external/filestore.go @@ -0,0 +1,131 @@ +package external + +import ( + "context" + "fmt" + "io" + "os" + "path" + + "golang.org/x/xerrors" + + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/venus-market/config" + "github.com/filecoin-project/venus-market/utils" +) + +var log = logging.Logger("external-pieces") + +type IExternalPieceStorage interface { + Read(context.Context, string) (io.ReadCloser, error) + Len(ctx context.Context, string2 string) (int64, error) + ReadOffset(context.Context, string, int, int) (io.ReadCloser, error) + Has(context.Context, string) (bool, error) +} + +type externalFsPieceStorage struct { + paths []string +} + +func NewExternalFsPieceStorage(cfg *config.ExternalFsPieceStore) (IExternalPieceStorage, error) { + fs := externalFsPieceStorage{paths: make([]string, 0)} + for _, psPath := range cfg.Paths { + if err := fs.validate(psPath); err != nil { + log.Errorf("%s validate failed: %s", psPath, err.Error()) + } else { + fs.paths = append(fs.paths, psPath) + } + } + + return &fs, nil +} + +func (ef externalFsPieceStorage) Len(ctx context.Context, pCidStr string) (int64, error) { + for _, psPath := range ef.paths { + st, err := os.Stat(path.Join(psPath, pCidStr)) + if err != nil { + log.Errorf("%s stat error: %s", pCidStr, err.Error()) + continue + } + + if st.IsDir() { + log.Errorf("%s is not a file", pCidStr) + continue + } + + return st.Size(), nil + } + + return 0, fmt.Errorf("file does not exist") +} + +func (ef externalFsPieceStorage) Read(ctx context.Context, pCidStr string) (io.ReadCloser, error) { + for _, psPath := range ef.paths { + st, err := os.Stat(path.Join(psPath, pCidStr)) + if err != nil { + log.Errorf("%s stat error: %s", pCidStr, err.Error()) + continue + } + + if st.IsDir() { + log.Errorf("%s is not a file", pCidStr) + continue + } + + return os.Open(path.Join(psPath, pCidStr)) + } + + return nil, fmt.Errorf("file does not exist") +} + +func (ef externalFsPieceStorage) ReadOffset(ctx context.Context, pCidStr string, offset int, size int) (io.ReadCloser, error) { + for _, psPath := range ef.paths { + dstPath := path.Join(psPath, pCidStr) + fs, err := os.Open(dstPath) + if err != nil { + log.Errorf("failed to open file %s: %s", pCidStr, err.Error()) + continue + } + + _, err = fs.Seek(int64(offset), 0) + if err != nil { + return nil, fmt.Errorf("failed to seek position to %d in file %s %s", offset, dstPath, err) + } + + return utils.NewLimitedBufferReader(fs, int(size)), nil + } + + return nil, fmt.Errorf("file does not exist") +} + +func (ef externalFsPieceStorage) Has(ctx context.Context, pCidStr string) (bool, error) { + for _, psPath := range ef.paths { + st, err := os.Stat(path.Join(psPath, pCidStr)) + if err != nil { + log.Errorf("%s stat error: %s", pCidStr, err.Error()) + continue + } + + if st.IsDir() { + log.Errorf("%s is not a file", pCidStr) + continue + } + + return true, nil + } + + return false, nil +} + +func (ef externalFsPieceStorage) validate(psPath string) error { + st, err := os.Stat(psPath) + if err != nil { + return err + } + + if !st.IsDir() { + return xerrors.Errorf("expect a directory but got file") + } + return nil +} diff --git a/piecestorage/filestore.go b/piecestorage/filestore.go index 91890cf2..527b76c6 100644 --- a/piecestorage/filestore.go +++ b/piecestorage/filestore.go @@ -8,9 +8,10 @@ import ( "os" "path" + xerrors "github.com/pkg/errors" + "github.com/filecoin-project/venus-market/config" "github.com/filecoin-project/venus-market/utils" - xerrors "github.com/pkg/errors" ) type IPreSignOp interface { @@ -25,7 +26,7 @@ type IPieceStorage interface { Len(ctx context.Context, string2 string) (int64, error) ReadOffset(context.Context, string, int, int) (io.ReadCloser, error) Has(context.Context, string) (bool, error) - Validate(s string) error + Validate() error IPreSignOp } @@ -74,7 +75,7 @@ func (f fsPieceStorage) ReadOffset(ctx context.Context, s string, offset int, si } _, err = fs.Seek(int64(offset), 0) if err != nil { - return nil, fmt.Errorf("unable to seek position to %din file %s %w", offset, dstPath, err) + return nil, fmt.Errorf("unable to seek position to %d in file %s %w", offset, dstPath, err) } return utils.NewLimitedBufferReader(fs, size), nil } @@ -90,7 +91,7 @@ func (f fsPieceStorage) Has(ctx context.Context, s string) (bool, error) { return true, nil } -func (f fsPieceStorage) Validate(s string) error { +func (f fsPieceStorage) Validate() error { st, err := os.Stat(f.baseUrl) if err != nil { if os.IsNotExist(err) { @@ -119,7 +120,7 @@ func (f fsPieceStorage) GetWriteUrl(ctx context.Context, s2 string) (string, err func newFsPieceStorage(fsCfg config.FsPieceStorage) (IPieceStorage, error) { fs := &fsPieceStorage{baseUrl: fsCfg.Path} - if err := fs.Validate(fsCfg.Path); err != nil { + if err := fs.Validate(); err != nil { return nil, err } return fs, nil diff --git a/piecestorage/modules.go b/piecestorage/modules.go index 7b6a9205..2d9f81c3 100644 --- a/piecestorage/modules.go +++ b/piecestorage/modules.go @@ -4,17 +4,23 @@ import ( "reflect" "strings" - "github.com/filecoin-project/venus-market/config" - "github.com/ipfs-force-community/venus-common-utils/builder" "golang.org/x/xerrors" + + "github.com/ipfs-force-community/venus-common-utils/builder" + + "github.com/filecoin-project/venus-market/config" + "github.com/filecoin-project/venus-market/piecestorage/external" ) var PieceStorageOpts = func(cfg *config.MarketConfig) builder.Option { return builder.Options( - //piece + // piece builder.Override(new(IPieceStorage), func(cfg *config.PieceStorage) (IPieceStorage, error) { return NewPieceStorage(cfg) - }), //save read piece data + }), // save read piece data + builder.Override(new(external.IExternalPieceStorage), func(cfg *config.ExternalFsPieceStore) (external.IExternalPieceStorage, error) { + return external.NewExternalFsPieceStorage(cfg) + }), ) } diff --git a/piecestorage/protocol_test.go b/piecestorage/protocol_test.go index 03d8cf3a..d73227af 100644 --- a/piecestorage/protocol_test.go +++ b/piecestorage/protocol_test.go @@ -40,7 +40,7 @@ func TestParserProtocol(t *testing.T) { wantErr: false, }, { - name: "s3 success", + name: "s3 fail", protocol: "s3:ak:sk:t1@http://region1.s3.com", wantErr: true, }, diff --git a/piecestorage/s3.go b/piecestorage/s3.go index 392adb19..7e44b58f 100644 --- a/piecestorage/s3.go +++ b/piecestorage/s3.go @@ -8,16 +8,18 @@ import ( "strings" "time" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/filecoin-project/venus-market/config" "github.com/filecoin-project/venus-market/utils" - logging "github.com/ipfs/go-log/v2" - "golang.org/x/xerrors" ) var log = logging.Logger("piece-storage") @@ -175,9 +177,9 @@ func (s s3PieceStorage) Has(ctx context.Context, piececid string) (bool, error) //todo 下面presign两个方法用于给客户端使用,暂时仅仅支持对象存储。 可能需要一个更合适的抽象模式 func (s s3PieceStorage) GetReadUrl(ctx context.Context, s2 string) (string, error) { if has, err := s.Has(ctx, s2); err != nil { - return "", xerrors.Errorf("check object:%s exist error:%w", s2, err) + return "", xerrors.Errorf("check object: %s exist error:%w", s2, err) } else if !has { - return "", xerrors.Errorf("object : %s not exists", s2) + return "", xerrors.Errorf("object: %s not exists", s2) } params := &s3.GetObjectInput{ @@ -197,7 +199,7 @@ func (s s3PieceStorage) GetWriteUrl(ctx context.Context, s2 string) (string, err return req.Presign(time.Minute * 30) } -func (s s3PieceStorage) Validate(piececid string) error { +func (s s3PieceStorage) Validate() error { _, err := s.s3Client.GetBucketAcl(&s3.GetBucketAclInput{ Bucket: aws.String(s.bucket), }) diff --git a/retrievalprovider/modules.go b/retrievalprovider/modules.go index ec8b3f83..cb321e1f 100644 --- a/retrievalprovider/modules.go +++ b/retrievalprovider/modules.go @@ -74,8 +74,6 @@ func RetrievalNetwork(h host.Host) rmnet.RetrievalMarketNetwork { var RetrievalProviderOpts = func(cfg *config.MarketConfig) builder.Option { return builder.Options( - - builder.Override(new(rmnet.RetrievalMarketNetwork), RetrievalNetwork), // Markets (retrieval) builder.Override(new(rmnet.RetrievalMarketNetwork), RetrievalNetwork), builder.Override(new(IRetrievalProvider), NewProvider), // save to metadata /retrievals/provider diff --git a/storageprovider/deal_assigner.go b/storageprovider/deal_assigner.go index 4cb41d9a..0bbe6c94 100644 --- a/storageprovider/deal_assigner.go +++ b/storageprovider/deal_assigner.go @@ -14,7 +14,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/ipfs/go-cid" - "github.com/filecoin-project/venus-market/config" "github.com/filecoin-project/venus-market/models/repo" types "github.com/filecoin-project/venus/venus-shared/types/market" ) @@ -32,8 +31,8 @@ var _ DealAssiger = (*dealAssigner)(nil) // NewProviderPieceStore creates a statestore for storing metadata about pieces // shared by the piecestorage and retrieval providers -func NewDealAssigner(lc fx.Lifecycle, pieceStorage *config.PieceStorage, r repo.Repo) (DealAssiger, error) { - ps, err := newPieceStoreEx(pieceStorage, r) +func NewDealAssigner(lc fx.Lifecycle, r repo.Repo) (DealAssiger, error) { + ps, err := newPieceStoreEx(r) if err != nil { return nil, xerrors.Errorf("construct extend piece store %w", err) } @@ -41,15 +40,12 @@ func NewDealAssigner(lc fx.Lifecycle, pieceStorage *config.PieceStorage, r repo. } type dealAssigner struct { - pieceStorage config.PieceStorage - repo repo.Repo + repo repo.Repo } // NewDsPieceStore returns a new piecestore based on the given datastore -func newPieceStoreEx(pieceStorage *config.PieceStorage, r repo.Repo) (DealAssiger, error) { +func newPieceStoreEx(r repo.Repo) (DealAssiger, error) { return &dealAssigner{ - pieceStorage: *pieceStorage, - repo: r, }, nil } From cf3ee1f088b68b5795e741c296e0ad35f7db9967 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=80=E9=A1=B5=E7=B4=A0=E4=B9=A6?= <2931107265@qq.com> Date: Fri, 18 Mar 2022 10:10:20 +0800 Subject: [PATCH 2/5] PieceStore: import force-seal deal --- tools/import-deal/main.go | 95 ++++++++++++++++++++++++ tools/import-deal/types/types.go | 120 +++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+) create mode 100644 tools/import-deal/main.go create mode 100644 tools/import-deal/types/types.go diff --git a/tools/import-deal/main.go b/tools/import-deal/main.go new file mode 100644 index 00000000..32118145 --- /dev/null +++ b/tools/import-deal/main.go @@ -0,0 +1,95 @@ +package main + +import ( + "flag" + "fmt" + "time" + + "gorm.io/driver/mysql" + "gorm.io/gorm" + + "github.com/filecoin-project/venus-market/tools/import-deal/types" +) + +func ImportDealsToMysql(srcConn, conn string) error { + var ( + maxOpenConn int = 10 + maxIdleConn int = 10 + ) + + db, err := gorm.Open(mysql.Open(srcConn)) + if err != nil { + return err + } + + db.Set("gorm:table_options", "CHARSET=utf8mb4") + db = db.Debug() + + sqlDB, err := db.DB() + if err != nil { + return err + } + defer func() { + _ = sqlDB.Close() + }() + + sqlDB.SetMaxOpenConns(maxOpenConn) + sqlDB.SetMaxIdleConns(maxIdleConn) + sqlDB.SetConnMaxLifetime(30 * time.Second) + + forceDeals := []*types.ForceDeal{} + if err := db.Table("deals").Find(&forceDeals).Error; err != nil { + return err + } + + // venus-market deals + dstDb, err := gorm.Open(mysql.Open(conn)) + if err != nil { + return err + } + + dstDb.Set("gorm:table_options", "CHARSET=utf8mb4") + dstDb = dstDb.Debug() + + dstSqlDB, err := dstDb.DB() + if err != nil { + return err + } + defer func() { + _ = dstSqlDB.Close() + }() + + dstSqlDB.SetMaxOpenConns(maxOpenConn) + dstSqlDB.SetMaxIdleConns(maxIdleConn) + dstSqlDB.SetConnMaxLifetime(30 * time.Second) + + deals := make([]types.Deal, 0, len(forceDeals)) + for _, deal := range forceDeals { + deals = append(deals, *deal.ToDeal()) + } + + if err = dstDb.AutoMigrate(&types.Deal{}); err != nil { + return err + } + + return dstDb.Create(&deals).Error +} + +func main() { + // mysql: user:password@tcp(localhost:3308)/db-name?loc=Local&parseTime=true&innodb_lock_wait_timeout=10 + var ( + srcConn, conn string + ) + + flag.StringVar(&srcConn, "src-conn", "", "mysql conn for src") + flag.StringVar(&conn, "conn", "", "mysql conn for market") + + flag.Parse() + + if err := ImportDealsToMysql(srcConn, conn); err != nil { + fmt.Printf("import deals to mysql err: %s\n", err.Error()) + return + } + + fmt.Println("import success.") +} diff --git a/tools/import-deal/types/types.go b/tools/import-deal/types/types.go new file mode 100644 index 00000000..ab8a81b7 --- /dev/null +++ b/tools/import-deal/types/types.go @@ -0,0 +1,120 @@ +package types + +import ( + "time" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/venus-messager/models/mtypes" + + "github.com/filecoin-project/venus-market/models/mysql" +) + +const ( + TTFromForce = "import" +) + +type ForceDeal struct { + ID uint64 `gorm:"primary_key;column:id;type:bigint(20) unsigned AUTO_INCREMENT;not null" json:"id"` + Dealid uint64 `gorm:"column:dealid;unique_index:uq_dealid_ttype;type:bigint(20) unsigned" json:"dealid"` + Sectorid uint64 `gorm:"column:sectorid;type:bigint(20) unsigned;index:idx_sector_id" json:"sectorid"` + Offset uint64 `gorm:"column:offset;type:bigint(20) unsigned" json:"offset"` + Unpadsize uint64 `gorm:"column:unpadsize;type:bigint(20) unsigned" json:"unpadsize"` + Filesize uint64 `gorm:"column:filesize;type:bigint(20) unsigned" json:"filesize"` + Finish bool `gorm:"column:finish;type:tinyint(3) unsigned" json:"finish"` + Start uint64 `gorm:"column:start;type:bigint(20) unsigned" json:"start"` + End uint64 `gorm:"column:end;type:bigint(20) unsigned" json:"end"` + Fetch bool `gorm:"column:fetch;type:tinyint(3) unsigned" json:"fetch"` + Piececid string `gorm:"column:piececid;type:varchar(255)" json:"piececid"` + Rootcid string `gorm:"column:rootcid;type:varchar(255)" json:"rootcid"` + Client string `gorm:"column:client;type:varchar(255)" json:"client"` + Provider string `gorm:"column:provider;type:varchar(255)" json:"provider"` + ClientCollateral uint64 `gorm:"column:ccollateral;type:bigint(20) unsigned" json:"ccollateral"` + Price uint64 `gorm:"column:price;type:bigint(20) unsigned" json:"price"` + Createtime time.Time `gorm:"column:createtime;type:timestamp" json:"createtime"` + Proposalcid string `gorm:"column:proposalcid;type:varchar(255)" json:"proposalcid"` + Peerid string `gorm:"column:peerid;type:varchar(255)" json:"peerid"` + Isabandon bool `gorm:"column:isabandon;not null;type:tinyint(3) unsigned" json:"isabandon"` +} + +func convertBigInt(v uint64) mtypes.Int { + return mtypes.NewInt(int64(v)) +} + +func (fd *ForceDeal) ToDeal() *Deal { + pCid, _ := cid.Decode(fd.Piececid) + rootCid, _ := cid.Decode(fd.Rootcid) + client, _ := address.NewFromString(fd.Client) + provider, _ := address.NewFromString(fd.Provider) + proposalCid, _ := cid.Decode(fd.Proposalcid) + + md := &Deal{ + ClientDealProposal: mysql.ClientDealProposal{ + PieceCID: mysql.DBCid(pCid), + Client: mysql.DBAddress(client), + Provider: mysql.DBAddress(provider), + StartEpoch: int64(fd.Start), + EndEpoch: int64(fd.End), + StoragePricePerEpoch: convertBigInt(fd.Price), + ClientCollateral: convertBigInt(fd.ClientCollateral), + }, + ProposalCid: mysql.DBCid(proposalCid), + Client: fd.Peerid, + State: storagemarket.StorageDealActive, + PayloadSize: int64(fd.Filesize), + Ref: mysql.DataRef{ + TransferType: TTFromForce, + Root: mysql.DBCid(rootCid), + + PieceCid: mysql.UndefDBCid, + }, + DealID: uint64(fd.Dealid), + CreationTime: fd.Createtime.UnixNano(), + SectorNumber: uint64(fd.Sectorid), + + Offset: fd.Offset, + } + + return md +} + +type Deal struct { + mysql.ClientDealProposal `gorm:"embedded;embeddedPrefix:cdp_"` + + ProposalCid mysql.DBCid `gorm:"column:proposal_cid;type:varchar(256);primary_key"` + AddFundsCid mysql.DBCid `gorm:"column:add_funds_cid;type:varchar(256);"` + PublishCid mysql.DBCid `gorm:"column:publish_cid;type:varchar(256);"` + Miner string `gorm:"column:miner_peer;type:varchar(128);"` + Client string `gorm:"column:client_peer;type:varchar(128);"` + State uint64 `gorm:"column:state;type:bigint unsigned;index"` + + PayloadSize int64 `gorm:"column:payload_size;type:bigint;"` + PiecePath string `gorm:"column:piece_path;type:varchar(256);"` + MetadataPath string `gorm:"column:metadata_path;type:varchar(256);"` + SlashEpoch int64 `gorm:"column:slash_epoch;type:bigint;"` + FastRetrieval bool `gorm:"column:fast_retrieval;"` + Message string `gorm:"column:message;type:varchar(512);"` + FundsReserved mtypes.Int `gorm:"column:funds_reserved;type:varchar(256);"` + Ref mysql.DataRef `gorm:"embedded;embeddedPrefix:ref_"` + AvailableForRetrieval bool `gorm:"column:available_for_retrieval;"` + + DealID uint64 `gorm:"column:deal_id;type:bigint unsigned;index"` + CreationTime int64 `gorm:"column:creation_time;type:bigint;"` + + TransferChannelId mysql.ChannelID `gorm:"embedded;embeddedPrefix:tci_"` + SectorNumber uint64 `gorm:"column:sector_number;type:bigint unsigned;"` + + InboundCAR string `gorm:"column:addr;type:varchar(256);"` + + Offset uint64 `gorm:"column:offset;type:bigint"` + Length uint64 `gorm:"column:length;type:bigint"` + PieceStatus string `gorm:"column:piece_status;type:varchar(128);index"` + + mysql.TimeStampOrm +} + +func (Deal) TableName() string { + return "storage_deals" +} From 4db88c663aa908637a6064e7ba8bba588a08c9e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=80=E9=A1=B5=E7=B4=A0=E4=B9=A6?= <2931107265@qq.com> Date: Tue, 22 Mar 2022 10:30:27 +0800 Subject: [PATCH 3/5] import-deals: set default miner peerID --- api/impl/venus_market.go | 23 ++++++++++------------- go.mod | 2 +- models/mysql/retrieval_deal.go | 7 ++++--- models/mysql/storage_deal.go | 10 ++++++---- tools/import-deal/types/types.go | 5 +++++ 5 files changed, 26 insertions(+), 21 deletions(-) diff --git a/api/impl/venus_market.go b/api/impl/venus_market.go index da47b6a6..08891363 100644 --- a/api/impl/venus_market.go +++ b/api/impl/venus_market.go @@ -9,13 +9,6 @@ import ( "sort" "time" - "github.com/filecoin-project/venus-market/models/repo" - - "github.com/filecoin-project/venus-market/minermgr" - "github.com/filecoin-project/venus-market/retrievalprovider" - "github.com/filecoin-project/venus/venus-shared/actors/builtin/paych" - - "github.com/filecoin-project/venus-market/config" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/host" @@ -33,17 +26,21 @@ import ( "github.com/filecoin-project/go-state-types/abi" clients2 "github.com/filecoin-project/venus-market/api/clients" + "github.com/filecoin-project/venus-market/config" + "github.com/filecoin-project/venus-market/minermgr" + "github.com/filecoin-project/venus-market/models/repo" "github.com/filecoin-project/venus-market/network" + "github.com/filecoin-project/venus-market/paychmgr" "github.com/filecoin-project/venus-market/piecestorage" + "github.com/filecoin-project/venus-market/retrievalprovider" "github.com/filecoin-project/venus-market/storageprovider" - marketapi "github.com/filecoin-project/venus/venus-shared/api/market" - types "github.com/filecoin-project/venus/venus-shared/types/market" - - "github.com/filecoin-project/venus-market/paychmgr" "github.com/filecoin-project/venus/pkg/constants" + "github.com/filecoin-project/venus/venus-shared/actors/builtin/paych" v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" + marketapi "github.com/filecoin-project/venus/venus-shared/api/market" vTypes "github.com/filecoin-project/venus/venus-shared/types" + types "github.com/filecoin-project/venus/venus-shared/types/market" ) var _ marketapi.IMarket = (*MarketNodeImpl)(nil) @@ -174,12 +171,12 @@ func (m MarketNodeImpl) MarketListIncompleteDeals(ctx context.Context, mAddr add if mAddr == address.Undef { deals, err = m.Repo.StorageDealRepo().ListDeal(ctx) if err != nil { - return nil, err + return nil, fmt.Errorf("get deal: %s", err) } } else { deals, err = m.Repo.StorageDealRepo().ListDealByAddr(ctx, mAddr) if err != nil { - return nil, err + return nil, fmt.Errorf("get deal for %s: %s", mAddr.String(), err) } } diff --git a/go.mod b/go.mod index 33cceea8..d18c652e 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/filecoin-project/go-state-types v0.1.3 github.com/filecoin-project/go-statemachine v1.0.1 github.com/filecoin-project/go-statestore v0.2.0 + github.com/filecoin-project/specs-actors/v2 v2.3.6 github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1 github.com/filecoin-project/specs-storage v0.2.0 github.com/filecoin-project/venus v1.2.3-0.20220316080815-811c837d0e10 @@ -75,7 +76,6 @@ require ( github.com/libp2p/go-libp2p-yamux v0.7.0 github.com/libp2p/go-maddr-filter v0.1.0 github.com/mitchellh/go-homedir v1.1.0 - github.com/modern-go/reflect2 v1.0.2 github.com/multiformats/go-base32 v0.0.3 github.com/multiformats/go-multiaddr v0.4.1 github.com/multiformats/go-multibase v0.0.3 diff --git a/models/mysql/retrieval_deal.go b/models/mysql/retrieval_deal.go index c8158e2f..208fb939 100644 --- a/models/mysql/retrieval_deal.go +++ b/models/mysql/retrieval_deal.go @@ -2,6 +2,7 @@ package mysql import ( "context" + "fmt" "time" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -120,7 +121,7 @@ func toProviderDealState(deal *retrievalDeal) (*types.ProviderDealState, error) if len(deal.Receiver) > 0 { newdeal.Receiver, err = decodePeerId(deal.Receiver) if err != nil { - return nil, err + return nil, fmt.Errorf("decode receiver: %s", err) } } @@ -129,11 +130,11 @@ func toProviderDealState(deal *retrievalDeal) (*types.ProviderDealState, error) newdeal.ChannelID.ID = datatransfer.TransferID(deal.ChannelID.ID) newdeal.ChannelID.Initiator, err = decodePeerId(deal.ChannelID.Initiator) if err != nil { - return nil, err + return nil, fmt.Errorf("decode ci_initiator: %s", err) } newdeal.ChannelID.Responder, err = decodePeerId(deal.ChannelID.Responder) if err != nil { - return nil, err + return nil, fmt.Errorf("decode ci_responder: %s", err) } } return newdeal, nil diff --git a/models/mysql/storage_deal.go b/models/mysql/storage_deal.go index 6e5602f9..763426c8 100644 --- a/models/mysql/storage_deal.go +++ b/models/mysql/storage_deal.go @@ -253,21 +253,23 @@ func toStorageDeal(src *storageDeal) (*types.MinerDeal, error) { md.TransferChannelID.ID = datatransfer.TransferID(src.TransferChannelId.ID) md.TransferChannelID.Initiator, err = decodePeerId(src.TransferChannelId.Initiator) if err != nil { - return nil, err + return nil, fmt.Errorf("decode tci_initiator: %s", err) } md.TransferChannelID.Responder, err = decodePeerId(src.TransferChannelId.Responder) if err != nil { - return nil, err + return nil, fmt.Errorf("decode tci_responder: %s", err) } } + // todo 导入的数据没有此字段 md.Miner, err = decodePeerId(src.Miner) if err != nil { - return nil, err + return nil, fmt.Errorf("decode miner_peer: %s", err) } + md.Client, err = decodePeerId(src.Client) if err != nil { - return nil, err + return nil, fmt.Errorf("decode client_peer: %s", err) } return md, nil diff --git a/tools/import-deal/types/types.go b/tools/import-deal/types/types.go index ab8a81b7..34f715c6 100644 --- a/tools/import-deal/types/types.go +++ b/tools/import-deal/types/types.go @@ -16,6 +16,8 @@ const ( TTFromForce = "import" ) + const DefaultPeerID = "12D3KooWQztpkQoRR1k3xmRowa3pwA8qDg9yKyiqQLGr6Y4tWkq5" + type ForceDeal struct { ID uint64 `gorm:"primary_key;column:id;type:bigint(20) unsigned AUTO_INCREMENT;not null" json:"id"` Dealid uint64 `gorm:"column:dealid;unique_index:uq_dealid_ttype;type:bigint(20) unsigned" json:"dealid"` @@ -53,6 +55,7 @@ func (fd *ForceDeal) ToDeal() *Deal { md := &Deal{ ClientDealProposal: mysql.ClientDealProposal{ PieceCID: mysql.DBCid(pCid), + VerifiedDeal: true, Client: mysql.DBAddress(client), Provider: mysql.DBAddress(provider), StartEpoch: int64(fd.Start), @@ -61,6 +64,7 @@ func (fd *ForceDeal) ToDeal() *Deal { ClientCollateral: convertBigInt(fd.ClientCollateral), }, ProposalCid: mysql.DBCid(proposalCid), + Miner: DefaultPeerID, // todo 反序列化时需要能解析 Client: fd.Peerid, State: storagemarket.StorageDealActive, PayloadSize: int64(fd.Filesize), @@ -75,6 +79,7 @@ func (fd *ForceDeal) ToDeal() *Deal { SectorNumber: uint64(fd.Sectorid), Offset: fd.Offset, + PieceStatus: "Proving", } return md From 146650d5c162ef8d8e204ef9308b8c844e370fc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=80=E9=A1=B5=E7=B4=A0=E4=B9=A6?= <2931107265@qq.com> Date: Tue, 22 Mar 2022 15:45:17 +0800 Subject: [PATCH 4/5] fix Validate def for IPieceStorage --- piecestorage/client_s3.go | 2 +- piecestorage/filestore.go | 6 +++--- piecestorage/s3.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/piecestorage/client_s3.go b/piecestorage/client_s3.go index c540526f..2001db6f 100644 --- a/piecestorage/client_s3.go +++ b/piecestorage/client_s3.go @@ -78,7 +78,7 @@ func (c PresignS3Storage) Has(ctx context.Context, s string) (bool, error) { panic("implement me") } -func (c PresignS3Storage) Validate() error { +func (c PresignS3Storage) Validate(s string) error { if c.presignUrl == nil { return fmt.Errorf("client s3 storage must has presign url") } diff --git a/piecestorage/filestore.go b/piecestorage/filestore.go index 527b76c6..5695d263 100644 --- a/piecestorage/filestore.go +++ b/piecestorage/filestore.go @@ -26,7 +26,7 @@ type IPieceStorage interface { Len(ctx context.Context, string2 string) (int64, error) ReadOffset(context.Context, string, int, int) (io.ReadCloser, error) Has(context.Context, string) (bool, error) - Validate() error + Validate(s string) error IPreSignOp } @@ -91,7 +91,7 @@ func (f fsPieceStorage) Has(ctx context.Context, s string) (bool, error) { return true, nil } -func (f fsPieceStorage) Validate() error { +func (f fsPieceStorage) Validate(s string) error { st, err := os.Stat(f.baseUrl) if err != nil { if os.IsNotExist(err) { @@ -120,7 +120,7 @@ func (f fsPieceStorage) GetWriteUrl(ctx context.Context, s2 string) (string, err func newFsPieceStorage(fsCfg config.FsPieceStorage) (IPieceStorage, error) { fs := &fsPieceStorage{baseUrl: fsCfg.Path} - if err := fs.Validate(); err != nil { + if err := fs.Validate(fsCfg.Path); err != nil { return nil, err } return fs, nil diff --git a/piecestorage/s3.go b/piecestorage/s3.go index 7e44b58f..12c838b3 100644 --- a/piecestorage/s3.go +++ b/piecestorage/s3.go @@ -199,7 +199,7 @@ func (s s3PieceStorage) GetWriteUrl(ctx context.Context, s2 string) (string, err return req.Presign(time.Minute * 30) } -func (s s3PieceStorage) Validate() error { +func (s s3PieceStorage) Validate(piececid string) error { _, err := s.s3Client.GetBucketAcl(&s3.GetBucketAclInput{ Bucket: aws.String(s.bucket), }) From be7a99598a5221f0e35d1dae2c2820a879a88193 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=80=E9=A1=B5=E7=B4=A0=E4=B9=A6?= <2931107265@qq.com> Date: Wed, 30 Mar 2022 14:16:18 +0800 Subject: [PATCH 5/5] fix lint CI --- cmd/venus-market/main.go | 6 ++---- cmd/venus-market/pool-run.go | 2 ++ cmd/venus-market/solo-run.go | 1 + dagstore/market_api.go | 25 ++++++++++++++----------- piecestorage/external/filestore.go | 2 +- tools/import-deal/main.go | 4 ++-- tools/import-deal/types/types.go | 8 ++++---- 7 files changed, 26 insertions(+), 22 deletions(-) diff --git a/cmd/venus-market/main.go b/cmd/venus-market/main.go index cd0bf3cb..fa4118da 100644 --- a/cmd/venus-market/main.go +++ b/cmd/venus-market/main.go @@ -110,7 +110,7 @@ var ( Usage: "auth token for connect wallet service", } - ExternalFsPieceStorageFlag = &cli.StringFlag{ + ExternalFsPieceStorageFlag = &cli.StringSliceFlag{ Name: "ex-fs-ps", Usage: "config external file system storage for piece (eg /mnt/store/f01000}", } @@ -260,9 +260,7 @@ func flagData(cctx *cli.Context, cfg *config.MarketConfig) error { if cctx.IsSet(ExternalFsPieceStorageFlag.Name) { cfg.ExternalFsPieceStore.Paths = make([]string, 0) paths := cctx.StringSlice(ExternalFsPieceStorageFlag.Name) - for _, path := range paths { - cfg.ExternalFsPieceStore.Paths = append(cfg.ExternalFsPieceStore.Paths, path) - } + cfg.ExternalFsPieceStore.Paths = append(cfg.ExternalFsPieceStore.Paths, paths...) } if cctx.IsSet(MysqlDsnFlag.Name) { diff --git a/cmd/venus-market/pool-run.go b/cmd/venus-market/pool-run.go index e47d0f70..6b83323c 100644 --- a/cmd/venus-market/pool-run.go +++ b/cmd/venus-market/pool-run.go @@ -30,6 +30,7 @@ import ( "github.com/filecoin-project/venus-market/storageprovider" types2 "github.com/filecoin-project/venus-market/types" "github.com/filecoin-project/venus-market/utils" + marketapi "github.com/filecoin-project/venus/venus-shared/api/market" "github.com/filecoin-project/venus/venus-shared/api/permission" ) @@ -49,6 +50,7 @@ var poolRunCmd = &cli.Command{ GatewayUrlFlag, GatewayTokenFlag, PieceStorageFlag, + ExternalFsPieceStorageFlag, MysqlDsnFlag, MinerListFlag, PaymentAddressFlag, diff --git a/cmd/venus-market/solo-run.go b/cmd/venus-market/solo-run.go index 044ff3fb..c4dbab43 100644 --- a/cmd/venus-market/solo-run.go +++ b/cmd/venus-market/solo-run.go @@ -45,6 +45,7 @@ var soloRunCmd = &cli.Command{ WalletUrlFlag, WalletTokenFlag, PieceStorageFlag, + ExternalFsPieceStorageFlag, MysqlDsnFlag, MinerListFlag, PaymentAddressFlag, diff --git a/dagstore/market_api.go b/dagstore/market_api.go index 0c8f65f2..b06b9bb3 100644 --- a/dagstore/market_api.go +++ b/dagstore/market_api.go @@ -77,20 +77,23 @@ func (m *marketAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (i return nil, err } return iocloser{r, padR}, nil - } else { - if has, _ = m.exFsPieceStorage.Has(ctx, pieceCid.String()); has { - r, err := m.exFsPieceStorage.Read(ctx, pieceCid.String()) - if err != nil { - return nil, err - } + } - padR, err := padreader.NewInflator(r, uint64(payloadSize), pieceSize.Unpadded()) - if err != nil { - return nil, err - } + has, err = m.exFsPieceStorage.Has(ctx, pieceCid.String()) + if err != nil { + return nil, err + } + if has { + r, err := m.exFsPieceStorage.Read(ctx, pieceCid.String()) + if err != nil { + return nil, err + } - return iocloser{r, padR}, nil + padR, err := padreader.NewInflator(r, payloadSize, pieceSize.Unpadded()) + if err != nil { + return nil, err } + return iocloser{r, padR}, nil } // todo unseal: ask miner who have this data, send unseal cmd, and read and pay after receive data diff --git a/piecestorage/external/filestore.go b/piecestorage/external/filestore.go index dd9f297c..4de52384 100644 --- a/piecestorage/external/filestore.go +++ b/piecestorage/external/filestore.go @@ -93,7 +93,7 @@ func (ef externalFsPieceStorage) ReadOffset(ctx context.Context, pCidStr string, return nil, fmt.Errorf("failed to seek position to %d in file %s %s", offset, dstPath, err) } - return utils.NewLimitedBufferReader(fs, int(size)), nil + return utils.NewLimitedBufferReader(fs, size), nil } return nil, fmt.Errorf("file does not exist") diff --git a/tools/import-deal/main.go b/tools/import-deal/main.go index 32118145..03fd9c8f 100644 --- a/tools/import-deal/main.go +++ b/tools/import-deal/main.go @@ -13,8 +13,8 @@ import ( func ImportDealsToMysql(srcConn, conn string) error { var ( - maxOpenConn int = 10 - maxIdleConn int = 10 + maxOpenConn = 10 + maxIdleConn = 10 ) db, err := gorm.Open(mysql.Open(srcConn)) diff --git a/tools/import-deal/types/types.go b/tools/import-deal/types/types.go index 34f715c6..98fd32b6 100644 --- a/tools/import-deal/types/types.go +++ b/tools/import-deal/types/types.go @@ -16,7 +16,7 @@ const ( TTFromForce = "import" ) - const DefaultPeerID = "12D3KooWQztpkQoRR1k3xmRowa3pwA8qDg9yKyiqQLGr6Y4tWkq5" +const DefaultPeerID = "12D3KooWQztpkQoRR1k3xmRowa3pwA8qDg9yKyiqQLGr6Y4tWkq5" type ForceDeal struct { ID uint64 `gorm:"primary_key;column:id;type:bigint(20) unsigned AUTO_INCREMENT;not null" json:"id"` @@ -74,11 +74,11 @@ func (fd *ForceDeal) ToDeal() *Deal { PieceCid: mysql.UndefDBCid, }, - DealID: uint64(fd.Dealid), + DealID: fd.Dealid, CreationTime: fd.Createtime.UnixNano(), - SectorNumber: uint64(fd.Sectorid), + SectorNumber: fd.Sectorid, - Offset: fd.Offset, + Offset: fd.Offset, PieceStatus: "Proving", }