Skip to content

Commit

Permalink
Merge pull request #314 from filecoin-project/feat/tanlang/update-uns…
Browse files Browse the repository at this point in the history
…eal-api

feat: update unseal api / 更新 unseal 的接口
  • Loading branch information
simlecode authored Mar 31, 2023
2 parents 34c3e7e + 05566fb commit d1cd381
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 75 deletions.
50 changes: 17 additions & 33 deletions dagstore/market_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,52 +85,36 @@ func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, err
for _, deal := range deals {
deal := deal

var isUnsealed bool
// Throttle this path to avoid flooding the storage subsystem.
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
// todo ProofType can not be passed, SP processes itself?
isUnsealed, err = m.gatewayMarketClient.IsUnsealed(ctx, deal.Proposal.Provider, pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize)

// send SectorsUnsealPiece task
wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize))
if err != nil {
return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorNumber, deal.DealID, err)
return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err)
}

if isUnsealed {
// send SectorsUnsealPiece task
wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize))
if err != nil {
return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err)
}

pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String())
if err != nil {
return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err)
}

return m.gatewayMarketClient.SectorsUnsealPiece(
ctx,
deal.Proposal.Provider,
pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize,
pieceTransfer,
)
pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String())
if err != nil {
return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err)
}

return nil
return m.gatewayMarketClient.SectorsUnsealPiece(
ctx,
deal.Proposal.Provider,
pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize,
pieceTransfer,
)
})

if err != nil {
log.Warnf("failed to check/retrieve unsealed sector: %s", err)
continue // move on to the next match.
}

if isUnsealed {
return true, nil
}
return true, nil
}

// we don't have an unsealed sector containing the piece
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.10.2-0.20230329075658-98ec86740c07
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e
github.com/filecoin-project/venus-messager v1.10.2-0.20230309071456-7cd8d49c6e9a
github.com/golang/mock v1.6.0
Expand All @@ -40,7 +40,7 @@ require (
github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef
github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230331030234-365136f176ef
github.com/ipfs/go-blockservice v0.5.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-cidutil v0.1.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893
github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0=
github.com/filecoin-project/venus v1.10.2-0.20230329075658-98ec86740c07 h1:o7olPXy9+ah3PfzkIASK2qTtPoeKSXBjeb7szJZ55vc=
github.com/filecoin-project/venus v1.10.2-0.20230329075658-98ec86740c07/go.mod h1:eCV4+qsHdDg7FXB8xLn5w/ay+Uu5pG3oAlPsB1nb6qU=
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1 h1:wOzhaoLA4AmfHa2ynT/KMf5GM53SSN41X8n0T+alVNw=
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1/go.mod h1:eCV4+qsHdDg7FXB8xLn5w/ay+Uu5pG3oAlPsB1nb6qU=
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e h1:Bxpt1AzPeNxmUnFT2Y8rpabr9x0wIC0Q87DeRmjL2co=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs=
Expand Down Expand Up @@ -834,8 +834,8 @@ github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go.
github.com/ipfs-force-community/venus-common-utils v0.0.0-20210924063144-1d3a5b30de87/go.mod h1:RTVEOzM+hkpqmcEWpyLDkx1oGO5r9ZWCgYxG/CsXzJQ=
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 h1:v/1/INcqm3kHLauWQYB63MwWJRWGz+3WEuUPp0jzIl8=
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7/go.mod h1:sSTUXgIu95tPHvgcYhdLuELmgPJWCP/pNMFtsrVtOyA=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356 h1:j+EdBUhTFZgQBoC+AQuucDlIGpdvRvjWZmtn3GIuMsU=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356/go.mod h1:J2VCU6ANymkl8MjjpHj+y2Wai7EGM1Htd6ImSD5Entw=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230331030234-365136f176ef h1:hDvFmxoriRcJS6YJxm04ME1Hjuv3eedoUxnuX/cWLPA=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230331030234-365136f176ef/go.mod h1:s9bSNvoFlOWH99ofrSPMn9+vcSEQZG7Cq1hiaPFA11M=
github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
Expand Down
18 changes: 5 additions & 13 deletions piecestorage/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package piecestorage

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -91,22 +90,15 @@ func (f *fsPieceStorage) GetRedirectUrl(_ context.Context, _ string) (string, er
return "", ErrUnsupportRedirect
}

func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (*market.Transfer, error) {
func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (string, error) {
if f.fsCfg.ReadOnly {
return nil, fmt.Errorf("%s id readonly piece store", f.fsCfg.Name)
return "", fmt.Errorf("%s id readonly piece store", f.fsCfg.Name)
}

dstPath := path.Join(f.baseUrl, pieceCid)
transfer := market.FsTransfer{Path: dstPath}
params, err := json.Marshal(&transfer)
if err != nil {
return nil, fmt.Errorf("construct piece transfer: %w", err)
}
// url example: http://market/resource?resource-id=xxx&store=xxx
url := fmt.Sprintf("/resource?resource-id=%s&store=%s", pieceCid, f.fsCfg.Name)

return &market.Transfer{
Type: market.PiecesTransferFs,
Params: params,
}, nil
return url, nil
}

func (f *fsPieceStorage) Has(_ context.Context, resourceId string) (bool, error) {
Expand Down
4 changes: 2 additions & 2 deletions piecestorage/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func (m *MemPieceStore) GetRedirectUrl(_ context.Context, resourceId string) (st
return "", ErrUnsupportRedirect
}

func (m *MemPieceStore) GetPieceTransfer(context.Context, string) (*market.Transfer, error) {
return &market.Transfer{}, nil
func (m *MemPieceStore) GetPieceTransfer(context.Context, string) (string, error) {
return "", nil
}

func (m *MemPieceStore) Validate(s string) error {
Expand Down
33 changes: 14 additions & 19 deletions piecestorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package piecestorage

import (
"context"
"encoding/json"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -206,31 +205,27 @@ func (s *s3PieceStorage) GetRedirectUrl(ctx context.Context, resourceId string)
return req.Presign(time.Hour * 24)
}

func (s *s3PieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (*market.Transfer, error) {
if s.s3Cfg.ReadOnly {
return nil, fmt.Errorf("%s id readonly piece store", s.s3Cfg.Name)
func (s *s3PieceStorage) GetPutObjectUrl(ctx context.Context, resourceId string) (string, error) {
params := &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.subdirWrapper(resourceId)),
}

transfer := market.S3Transfer{
EndPoint: s.s3Cfg.EndPoint,
Bucket: s.s3Cfg.Bucket,
SubDir: s.s3Cfg.SubDir,

AccessKey: s.s3Cfg.AccessKey,
SecretKey: s.s3Cfg.SecretKey,
Token: s.s3Cfg.Token,
req, _ := s.s3Client.PutObjectRequest(params)
return req.Presign(time.Hour * 24)
}

Key: pieceCid,
func (s *s3PieceStorage) GetPieceTransfer(ctx context.Context, pieceCid string) (string, error) {
if s.s3Cfg.ReadOnly {
return "", fmt.Errorf("%s is readonly piece store", s.s3Cfg.Name)
}
params, err := json.Marshal(&transfer)

url, err := s.GetPutObjectUrl(ctx, pieceCid)
if err != nil {
return nil, fmt.Errorf("construct piece transfer: %w", err)
return "", err
}

return &market.Transfer{
Type: market.PiecesTransferS3,
Params: params,
}, nil
return url, nil
}

func (s *s3PieceStorage) GetStorageStatus() (market.StorageStatus, error) {
Expand Down
2 changes: 1 addition & 1 deletion piecestorage/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ type IPieceStorage interface {
Has(context.Context, string) (bool, error)
Validate(string) error
GetStorageStatus() (market.StorageStatus, error)
GetPieceTransfer(context.Context, string) (*market.Transfer, error)
GetPieceTransfer(context.Context, string) (string, error)
}
64 changes: 63 additions & 1 deletion rpc/piece_storage_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,19 @@ func NewPieceStorageServer(pieceStorageMgr *piecestorage.PieceStorageManager) *P
}

func (p *PieceStorageServer) ServeHTTP(res http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
switch req.Method {
case http.MethodGet:
p.handleGet(res, req)
case http.MethodPut:
p.handlePut(res, req)
default:
// handle error
logErrorAndResonse(res, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
}

func (p *PieceStorageServer) handleGet(res http.ResponseWriter, req *http.Request) {
resourceID := req.URL.Query().Get("resource-id")
if len(resourceID) == 0 {
logErrorAndResonse(res, "resource is empty", http.StatusBadRequest)
Expand Down Expand Up @@ -79,6 +87,60 @@ func (p *PieceStorageServer) ServeHTTP(res http.ResponseWriter, req *http.Reques
_, _ = io.Copy(res, r)
}

// handlePut save resource to piece storage
// url example: http://market/resource?resource-id=xxx&store=xxx or http://market/resource?resource-id=xxx&size=xxx
func (p *PieceStorageServer) handlePut(res http.ResponseWriter, req *http.Request) {
ctx := req.Context()
resourceID := req.URL.Query().Get("resource-id")
if len(resourceID) == 0 {
logErrorAndResonse(res, "resource is empty", http.StatusBadRequest)
return
}

if req.Body == nil {
logErrorAndResonse(res, "body is empty", http.StatusBadRequest)
return
}

if !req.URL.Query().Has("store") && !req.URL.Query().Has("size") {
logErrorAndResonse(res, "both store and size is empty", http.StatusBadRequest)
return
}

var store piecestorage.IPieceStorage
if req.URL.Query().Has("store") {
storeName := req.URL.Query().Get("store")

var err error
store, err = p.pieceStorageMgr.GetPieceStorageByName(storeName)
if err != nil {
logErrorAndResonse(res, fmt.Sprintf("fail to get store %s: %s", storeName, err), http.StatusInternalServerError)
return
}
}
if store == nil && req.URL.Query().Has("size") {
sizeStr := req.URL.Query().Get("size")
size, err := strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
logErrorAndResonse(res, fmt.Sprintf("size %s is invalid", sizeStr), http.StatusBadRequest)
return
}
store, err = p.pieceStorageMgr.FindStorageForWrite(size)
if err != nil {
logErrorAndResonse(res, fmt.Sprintf("fail to find store for write: %s", err), http.StatusInternalServerError)
return
}
}

_, err := store.SaveTo(ctx, resourceID, req.Body)
if err != nil {
logErrorAndResonse(res, fmt.Sprintf("fail to save resource %s to store %s: %s", resourceID, store.GetName(), err), http.StatusInternalServerError)
return
}

res.WriteHeader(http.StatusOK)
}

func logErrorAndResonse(res http.ResponseWriter, err string, code int) {
resourceLog.Errorf("resource request fail Code: %d, Message: %s", code, err)
http.Error(res, err, code)
Expand Down

0 comments on commit d1cd381

Please sign in to comment.