From e4b0c547b3b8ec0cf36fda9fc4bc7e58884db60e Mon Sep 17 00:00:00 2001 From: Tiance <1033935631@qq.com> Date: Mon, 9 Jan 2023 17:20:29 +0800 Subject: [PATCH] feat: unsealed from sp through venus-gateway --- dagstore/market_api.go | 87 ++++++++++++++++++++++++++++++++++--- dagstore/market_api_test.go | 2 +- dagstore/modules.go | 9 +++- go.mod | 5 +-- go.sum | 10 ++--- piecestorage/filestore.go | 38 ++++++++++++---- piecestorage/memstore.go | 4 ++ piecestorage/s3.go | 60 ++++++++++++++++++------- piecestorage/storagemgr.go | 10 ++--- piecestorage/type.go | 3 +- 10 files changed, 182 insertions(+), 46 deletions(-) diff --git a/dagstore/market_api.go b/dagstore/market_api.go index 135c666f..91cacd84 100644 --- a/dagstore/market_api.go +++ b/dagstore/market_api.go @@ -12,13 +12,15 @@ import ( "github.com/ipfs/go-cid" "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/dagstore/throttle" "github.com/filecoin-project/go-padreader" - gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" + vSharedTypes "github.com/filecoin-project/venus/venus-shared/types" marketMetrics "github.com/filecoin-project/venus-market/v2/metrics" "github.com/filecoin-project/venus-market/v2/models/repo" "github.com/filecoin-project/venus-market/v2/piecestorage" + "github.com/filecoin-project/venus-market/v2/storageprovider" "github.com/filecoin-project/venus-market/v2/utils" ) @@ -35,17 +37,28 @@ type marketAPI struct { useTransient bool metricsCtx metrics.MetricsCtx gatewayMarketClient gatewayAPIV2.IMarketClient + + throttle throttle.Throttler } var _ MarketAPI = (*marketAPI)(nil) -func NewMarketAPI(ctx metrics.MetricsCtx, repo repo.Repo, pieceStorageMgr *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient, useTransient bool) MarketAPI { +func NewMarketAPI( + ctx metrics.MetricsCtx, + repo repo.Repo, + pieceStorageMgr *piecestorage.PieceStorageManager, + gatewayMarketClient gatewayAPIV2.IMarketClient, + useTransient bool, + concurrency int) MarketAPI { + return &marketAPI{ pieceRepo: repo.StorageDealRepo(), pieceStorageMgr: pieceStorageMgr, useTransient: useTransient, metricsCtx: ctx, gatewayMarketClient: gatewayMarketClient, + + throttle: throttle.Fixed(concurrency), } } @@ -56,11 +69,75 @@ func (m *marketAPI) Start(_ context.Context) error { func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { _, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String()) if err != nil { - return false, fmt.Errorf("unable to find storage for piece %s %w", pieceCid, err) + log.Warnf("unable to find storage for piece %s: %s", pieceCid, err) + + // check it from the SP through venus-gateway + deals, err := m.pieceRepo.GetDealsByPieceCidAndStatus(ctx, pieceCid, storageprovider.ReadyRetrievalDealStatus...) + if err != nil { + return false, fmt.Errorf("get delas for piece %s: %w", pieceCid, err) + } + + if len(deals) == 0 { + return false, fmt.Errorf("no storage deals found for piece %s", pieceCid) + } + + // check if we have an unsealed deal for the given piece in any of the unsealed sectors. + for _, deal := range deals { + deal := deal + + var isUnsealed bool + // Throttle this path to avoid flooding the storage subsystem. + err := m.throttle.Do(ctx, func(ctx context.Context) (err error) { + // todo ProofType can not be passed, SP processes itself? + isUnsealed, err = m.gatewayMarketClient.IsUnsealed(ctx, deal.Proposal.Provider, pieceCid, + deal.SectorNumber, + vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()), + deal.Proposal.PieceSize) + if err != nil { + return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorNumber, deal.DealID, err) + } + + if isUnsealed { + // send SectorsUnsealPiece task + wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize)) + if err != nil { + return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err) + } + + pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String()) + if err != nil { + return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err) + } + + return m.gatewayMarketClient.SectorsUnsealPiece( + ctx, + deal.Proposal.Provider, + pieceCid, + deal.SectorNumber, + vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()), + deal.Proposal.PieceSize, + pieceTransfer, + ) + } + + return nil + }) + + if err != nil { + log.Warnf("failed to check/retrieve unsealed sector: %s", err) + continue // move on to the next match. + } + + if isUnsealed { + return true, nil + } + } + + // we don't have an unsealed sector containing the piece + return false, nil } + return true, nil - // todo check isunseal from miner - // m.gatewayMarketClient.IsUnsealed() } func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) { diff --git a/dagstore/market_api_test.go b/dagstore/market_api_test.go index 7e196736..a76ba3c3 100644 --- a/dagstore/market_api_test.go +++ b/dagstore/market_api_test.go @@ -54,7 +54,7 @@ func TestMarket(t *testing.T) { assert.Nil(t, err) // todo: mock IMarketEvent - marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false) + marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false, 100) size, err := marketAPI.GetUnpaddedCARSize(ctx, testResourceId) assert.Nil(t, err) diff --git a/dagstore/modules.go b/dagstore/modules.go index ecec639a..de6c1a80 100644 --- a/dagstore/modules.go +++ b/dagstore/modules.go @@ -31,7 +31,14 @@ const ( // CreateAndStartMarketAPI creates a new MarketAPI adaptor for the dagstore mounts. func CreateAndStartMarketAPI(ctx metrics.MetricsCtx, lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient) (MarketAPI, error) { - mountApi := NewMarketAPI(ctx, repo, pieceStorage, gatewayMarketClient, r.UseTransient) + mountApi := NewMarketAPI( + ctx, + repo, + pieceStorage, + gatewayMarketClient, + r.UseTransient, + r.MaxConcurrencyStorageCalls, + ) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { return mountApi.Start(ctx) diff --git a/go.mod b/go.mod index f399738a..eb9ce2e9 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/filecoin-project/go-statestore v0.2.0 github.com/filecoin-project/specs-actors/v2 v2.3.6 github.com/filecoin-project/specs-actors/v7 v7.0.1 - github.com/filecoin-project/venus v1.10.2-0.20230316084941-2180049a244b + github.com/filecoin-project/venus v1.10.2-0.20230320061738-c105ca752826 github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e github.com/filecoin-project/venus-messager v1.10.2-0.20230309071456-7cd8d49c6e9a github.com/golang/mock v1.6.0 @@ -40,7 +40,7 @@ require ( github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 - github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230309021833-84fdba83f37d + github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356 github.com/ipfs/go-blockservice v0.4.0 github.com/ipfs/go-cid v0.3.2 github.com/ipfs/go-cidutil v0.1.0 @@ -137,7 +137,6 @@ require ( github.com/filecoin-project/specs-actors/v4 v4.0.2 // indirect github.com/filecoin-project/specs-actors/v5 v5.0.6 // indirect github.com/filecoin-project/specs-actors/v6 v6.0.2 // indirect - github.com/filecoin-project/specs-storage v0.4.1 // indirect github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect diff --git a/go.sum b/go.sum index 57d9b509..41f2481b 100644 --- a/go.sum +++ b/go.sum @@ -410,7 +410,6 @@ github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psS github.com/filecoin-project/go-state-types v0.1.1-0.20210810190654-139e0e79e69e/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g= github.com/filecoin-project/go-state-types v0.1.1/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g= github.com/filecoin-project/go-state-types v0.1.3/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g= -github.com/filecoin-project/go-state-types v0.1.4/go.mod h1:xCA/WfKlC2zcn3fUmDv4IrzznwS98X5XW/irUP3Lhxg= github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.1.8/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= @@ -457,14 +456,13 @@ github.com/filecoin-project/specs-actors/v7 v7.0.1/go.mod h1:tPLEYXoXhcpyLh69Ccq github.com/filecoin-project/specs-actors/v8 v8.0.1 h1:4u0tIRJeT5G7F05lwLRIsDnsrN+bJ5Ixj6h49Q7uE2Y= github.com/filecoin-project/specs-storage v0.2.2/go.mod h1:6cc/lncmAxMUocPi0z1EPCX63beIX7F7UnlmUZ3hLQo= github.com/filecoin-project/specs-storage v0.4.1 h1:yvLEaLZj8f+uByhNC4mFOtCUyL2wQku+NGBp6hjTe9M= -github.com/filecoin-project/specs-storage v0.4.1/go.mod h1:Z2eK6uMwAOSLjek6+sy0jNV2DSsMEENziMUz0GHRFBw= github.com/filecoin-project/storetheindex v0.3.5/go.mod h1:0r3d0kSpK63O6AvLr1CjAINLi+nWD49clzcnKV+GLpI= github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 h1:6GCuzxLVHBzlz7y+FkbHh6n0UyoEGWqDwJKQPJoz7bE= github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8= github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E= github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0= -github.com/filecoin-project/venus v1.10.2-0.20230316084941-2180049a244b h1:T0TIlKIFsOvOKDIADN4P7xBC+coJ7YlbUqift11TCIE= -github.com/filecoin-project/venus v1.10.2-0.20230316084941-2180049a244b/go.mod h1:d6XlyUBZd+SHydYimTTkUr3O5rjYOHQTsydI4Nxy6U8= +github.com/filecoin-project/venus v1.10.2-0.20230320061738-c105ca752826 h1:NnZjr537T102iDVdxm8e/5+t7gJmtfSkpay26KQS9LY= +github.com/filecoin-project/venus v1.10.2-0.20230320061738-c105ca752826/go.mod h1:d6XlyUBZd+SHydYimTTkUr3O5rjYOHQTsydI4Nxy6U8= github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU= github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e h1:Bxpt1AzPeNxmUnFT2Y8rpabr9x0wIC0Q87DeRmjL2co= github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs= @@ -827,8 +825,8 @@ github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go. github.com/ipfs-force-community/venus-common-utils v0.0.0-20210924063144-1d3a5b30de87/go.mod h1:RTVEOzM+hkpqmcEWpyLDkx1oGO5r9ZWCgYxG/CsXzJQ= github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 h1:v/1/INcqm3kHLauWQYB63MwWJRWGz+3WEuUPp0jzIl8= github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7/go.mod h1:sSTUXgIu95tPHvgcYhdLuELmgPJWCP/pNMFtsrVtOyA= -github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230309021833-84fdba83f37d h1:/ajr1CjW48GP0vwZgONtUOF74nImn6p4dybeGd0UF98= -github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230309021833-84fdba83f37d/go.mod h1:+TOIuiXWzfS5au9pK5oyU8AzDn/aMJhzSXXlQkVwf5A= +github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356 h1:j+EdBUhTFZgQBoC+AQuucDlIGpdvRvjWZmtn3GIuMsU= +github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356/go.mod h1:J2VCU6ANymkl8MjjpHj+y2Wai7EGM1Htd6ImSD5Entw= github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= diff --git a/piecestorage/filestore.go b/piecestorage/filestore.go index f1ee3e3e..acc49f75 100644 --- a/piecestorage/filestore.go +++ b/piecestorage/filestore.go @@ -2,18 +2,20 @@ package piecestorage import ( "context" + "encoding/json" "fmt" "io" "io/ioutil" "os" "path" + "github.com/filecoin-project/dagstore/mount" + + "github.com/filecoin-project/venus/pkg/util/fsutil" "github.com/filecoin-project/venus/venus-shared/types/market" - "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/venus-market/v2/config" "github.com/filecoin-project/venus-market/v2/utils" - "github.com/filecoin-project/venus/pkg/util/fsutil" ) type fsPieceStorage struct { @@ -21,7 +23,7 @@ type fsPieceStorage struct { fsCfg *config.FsPieceStorage } -func (f *fsPieceStorage) Len(ctx context.Context, resourceId string) (int64, error) { +func (f *fsPieceStorage) Len(_ context.Context, resourceId string) (int64, error) { st, err := os.Stat(path.Join(f.baseUrl, resourceId)) if err != nil { return 0, err @@ -33,7 +35,7 @@ func (f *fsPieceStorage) Len(ctx context.Context, resourceId string) (int64, err return st.Size(), err } -func (f *fsPieceStorage) ListResourceIds(ctx context.Context) ([]string, error) { +func (f *fsPieceStorage) ListResourceIds(_ context.Context) ([]string, error) { entries, err := os.ReadDir(f.baseUrl) if err != nil { return nil, err @@ -47,7 +49,7 @@ func (f *fsPieceStorage) ListResourceIds(ctx context.Context) ([]string, error) return resources, nil } -func (f *fsPieceStorage) SaveTo(ctx context.Context, resourceId string, r io.Reader) (int64, error) { +func (f *fsPieceStorage) SaveTo(_ context.Context, resourceId string, r io.Reader) (int64, error) { if f.fsCfg.ReadOnly { return 0, fmt.Errorf("do not write to a 'readonly' piece store") } @@ -67,7 +69,7 @@ func (f *fsPieceStorage) SaveTo(ctx context.Context, resourceId string, r io.Rea return wlen, err } -func (f *fsPieceStorage) GetReaderCloser(ctx context.Context, resourceId string) (io.ReadCloser, error) { +func (f *fsPieceStorage) GetReaderCloser(_ context.Context, resourceId string) (io.ReadCloser, error) { dstPath := path.Join(f.baseUrl, resourceId) fs, err := os.Open(dstPath) if err != nil { @@ -76,7 +78,7 @@ func (f *fsPieceStorage) GetReaderCloser(ctx context.Context, resourceId string) return fs, nil } -func (f *fsPieceStorage) GetMountReader(ctx context.Context, resourceId string) (mount.Reader, error) { +func (f *fsPieceStorage) GetMountReader(_ context.Context, resourceId string) (mount.Reader, error) { dstPath := path.Join(f.baseUrl, resourceId) fs, err := os.Open(dstPath) if err != nil { @@ -89,7 +91,25 @@ func (f *fsPieceStorage) GetRedirectUrl(_ context.Context, _ string) (string, er return "", ErrUnsupportRedirect } -func (f *fsPieceStorage) Has(ctx context.Context, resourceId string) (bool, error) { +func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (*market.Transfer, error) { + if f.fsCfg.ReadOnly { + return nil, fmt.Errorf("%s id readonly piece store", f.fsCfg.Name) + } + + dstPath := path.Join(f.baseUrl, pieceCid) + transfer := market.FsTransfer{Path: dstPath} + params, err := json.Marshal(&transfer) + if err != nil { + return nil, fmt.Errorf("construct piece transfer: %w", err) + } + + return &market.Transfer{ + Type: market.PiecesTransferFs, + Params: params, + }, nil +} + +func (f *fsPieceStorage) Has(_ context.Context, resourceId string) (bool, error) { _, err := os.Stat(path.Join(f.baseUrl, resourceId)) if err != nil { if os.IsNotExist(err) { @@ -100,7 +120,7 @@ func (f *fsPieceStorage) Has(ctx context.Context, resourceId string) (bool, erro return true, nil } -func (f *fsPieceStorage) Validate(resourceId string) error { +func (f *fsPieceStorage) Validate(_ string) error { st, err := os.Stat(f.baseUrl) if err != nil { if os.IsNotExist(err) { diff --git a/piecestorage/memstore.go b/piecestorage/memstore.go index ad828047..db022735 100644 --- a/piecestorage/memstore.go +++ b/piecestorage/memstore.go @@ -109,6 +109,10 @@ func (m *MemPieceStore) GetRedirectUrl(_ context.Context, resourceId string) (st return "", ErrUnsupportRedirect } +func (m *MemPieceStore) GetPieceTransfer(context.Context, string) (*market.Transfer, error) { + return &market.Transfer{}, nil +} + func (m *MemPieceStore) Validate(s string) error { return nil } diff --git a/piecestorage/s3.go b/piecestorage/s3.go index 80ee8230..9f7cdd82 100644 --- a/piecestorage/s3.go +++ b/piecestorage/s3.go @@ -2,6 +2,7 @@ package piecestorage import ( "context" + "encoding/json" "fmt" "io" "math" @@ -9,10 +10,12 @@ import ( "strings" "time" - "github.com/filecoin-project/venus/venus-shared/types/market" - logging "github.com/ipfs/go-log/v2" + valid "github.com/asaskevich/govalidator" + + "github.com/filecoin-project/dagstore/mount" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" @@ -20,8 +23,8 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" - valid "github.com/asaskevich/govalidator" - "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/venus/venus-shared/types/market" + "github.com/filecoin-project/venus-market/v2/config" "github.com/filecoin-project/venus-market/v2/utils" ) @@ -112,7 +115,7 @@ func NewS3PieceStorage(s3Cfg *config.S3PieceStorage) (IPieceStorage, error) { }, nil } -func (s *s3PieceStorage) SaveTo(ctx context.Context, resourceId string, r io.Reader) (int64, error) { +func (s *s3PieceStorage) SaveTo(_ context.Context, resourceId string, r io.Reader) (int64, error) { if s.s3Cfg.ReadOnly { return 0, fmt.Errorf("do not write to a 'readonly' piece store") } @@ -130,10 +133,10 @@ func (s *s3PieceStorage) SaveTo(ctx context.Context, resourceId string, r io.Rea return int64(countReader.Count()), nil } -func (s *s3PieceStorage) Len(ctx context.Context, piececid string) (int64, error) { +func (s *s3PieceStorage) Len(_ context.Context, pieceCid string) (int64, error) { params := &s3.GetObjectInput{ Bucket: aws.String(s.bucket), - Key: aws.String(s.subdirWrapper(piececid)), + Key: aws.String(s.subdirWrapper(pieceCid)), } result, err := s.s3Client.GetObject(params) @@ -143,7 +146,7 @@ func (s *s3PieceStorage) Len(ctx context.Context, piececid string) (int64, error return *result.ContentLength, nil } -func (s *s3PieceStorage) ListResourceIds(ctx context.Context) ([]string, error) { +func (s *s3PieceStorage) ListResourceIds(_ context.Context) ([]string, error) { params := &s3.ListObjectsV2Input{ Bucket: aws.String(s.bucket), } @@ -166,7 +169,7 @@ func (s *s3PieceStorage) ListResourceIds(ctx context.Context) ([]string, error) return pieces, nil } -func (s s3PieceStorage) GetReaderCloser(ctx context.Context, resourceId string) (io.ReadCloser, error) { +func (s *s3PieceStorage) GetReaderCloser(_ context.Context, resourceId string) (io.ReadCloser, error) { params := &s3.GetObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(s.subdirWrapper(resourceId)), @@ -179,7 +182,7 @@ func (s s3PieceStorage) GetReaderCloser(ctx context.Context, resourceId string) return result.Body, nil } -func (s s3PieceStorage) GetMountReader(ctx context.Context, resourceId string) (mount.Reader, error) { +func (s *s3PieceStorage) GetMountReader(ctx context.Context, resourceId string) (mount.Reader, error) { len, err := s.Len(ctx, resourceId) if err != nil { return nil, err @@ -187,11 +190,11 @@ func (s s3PieceStorage) GetMountReader(ctx context.Context, resourceId string) ( return newSeekWraper(s.s3Client, s.bucket, s.subdirWrapper(resourceId), len-1), nil } -func (s s3PieceStorage) GetRedirectUrl(ctx context.Context, resourceId string) (string, error) { +func (s *s3PieceStorage) GetRedirectUrl(ctx context.Context, resourceId string) (string, error) { if has, err := s.Has(ctx, resourceId); err != nil { - return "", fmt.Errorf("check object: %s exist error:%w", s.subdirWrapper(resourceId), err) + return "", fmt.Errorf("check object %s exist error: %w", s.subdirWrapper(resourceId), err) } else if !has { - return "", fmt.Errorf("object: %s not exists", s.subdirWrapper(resourceId)) + return "", fmt.Errorf("object %s not exists", s.subdirWrapper(resourceId)) } params := &s3.GetObjectInput{ @@ -203,6 +206,33 @@ func (s s3PieceStorage) GetRedirectUrl(ctx context.Context, resourceId string) ( return req.Presign(time.Hour * 24) } +func (s *s3PieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (*market.Transfer, error) { + if s.s3Cfg.ReadOnly { + return nil, fmt.Errorf("%s id readonly piece store", s.s3Cfg.Name) + } + + transfer := market.S3Transfer{ + EndPoint: s.s3Cfg.EndPoint, + Bucket: s.s3Cfg.Bucket, + SubDir: s.s3Cfg.SubDir, + + AccessKey: s.s3Cfg.AccessKey, + SecretKey: s.s3Cfg.SecretKey, + Token: s.s3Cfg.Token, + + Key: pieceCid, + } + params, err := json.Marshal(&transfer) + if err != nil { + return nil, fmt.Errorf("construct piece transfer: %w", err) + } + + return &market.Transfer{ + Type: market.PiecesTransferS3, + Params: params, + }, nil +} + func (s *s3PieceStorage) GetStorageStatus() (market.StorageStatus, error) { return market.StorageStatus{ Capacity: 0, @@ -210,7 +240,7 @@ func (s *s3PieceStorage) GetStorageStatus() (market.StorageStatus, error) { }, nil } -func (s *s3PieceStorage) Has(ctx context.Context, piececid string) (bool, error) { +func (s *s3PieceStorage) Has(_ context.Context, piececid string) (bool, error) { params := &s3.HeadObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(s.subdirWrapper(piececid)), @@ -231,7 +261,7 @@ func (s *s3PieceStorage) Has(ctx context.Context, piececid string) (bool, error) return true, nil } -func (s *s3PieceStorage) Validate(piececid string) error { +func (s *s3PieceStorage) Validate(_ string) error { _, err := s.s3Client.GetBucketAcl(&s3.GetBucketAclInput{ Bucket: aws.String(s.bucket), }) diff --git a/piecestorage/storagemgr.go b/piecestorage/storagemgr.go index 638ec5d5..9c55c507 100644 --- a/piecestorage/storagemgr.go +++ b/piecestorage/storagemgr.go @@ -21,7 +21,7 @@ func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, err // todo: extract name check logic to a function and check blank in name for _, fsCfg := range cfg.Fs { - // check if storage already exist in manager and it's name is not empty + // check if storage already exist in manager, and it's name is not empty if fsCfg.Name == "" { return nil, fmt.Errorf("fs piece storage name is empty, must set storage name in piece storage config `name=yourname`") } @@ -38,7 +38,7 @@ func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, err } for _, s3Cfg := range cfg.S3 { - // check if storage already exist in manager and it's name is not empty + // check if storage already exist in manager, and it's name is not empty if s3Cfg.Name == "" { return nil, fmt.Errorf("s3 pieceStorage name is empty, must set storage name in piece storage config `name=yourname`") } @@ -64,7 +64,7 @@ func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string) _ = p.EachPieceStorage(func(st IPieceStorage) error { has, err := st.Has(ctx, s) if err != nil { - log.Warnf("got error while check avaibale in storageg") + log.Warnf("got error while check avaibale in storage: %s", err.Error()) return nil } if has { @@ -74,7 +74,7 @@ func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string) }) if len(storages) == 0 { - return nil, fmt.Errorf("unable to find piece in storage %s", s) + return nil, fmt.Errorf("unable to find piece %s in storage", s) } return randStorageSelector(storages) @@ -125,7 +125,7 @@ func (p *PieceStorageManager) AddPieceStorage(s IPieceStorage) error { p.lk.Lock() defer p.lk.Unlock() - // check if storage already exist in manager and it's name is not empty + // check if storage already exist in manager, and it's name is not empty _, ok := p.storages[s.GetName()] if ok { return fmt.Errorf("duplicate storage name: %s", s.GetName()) diff --git a/piecestorage/type.go b/piecestorage/type.go index cd9a327d..e95bfba4 100644 --- a/piecestorage/type.go +++ b/piecestorage/type.go @@ -29,7 +29,7 @@ type IPieceStorage interface { Len(context.Context, string) (int64, error) // ListResourceIds get resource ids from piece store ListResourceIds(ctx context.Context) ([]string, error) - // GetMountReader use direct read if storage have low performance effecitive ReadAt + // GetReaderCloser use direct read if storage have low performance effecitive ReadAt GetReaderCloser(context.Context, string) (io.ReadCloser, error) // GetMountReader used to support dagstore GetMountReader(context.Context, string) (mount.Reader, error) @@ -38,4 +38,5 @@ type IPieceStorage interface { Has(context.Context, string) (bool, error) Validate(string) error GetStorageStatus() (market.StorageStatus, error) + GetPieceTransfer(context.Context, string) (*market.Transfer, error) }