From f39a4d35237eed3837a1f5bdb46b8b707360f6dc Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Thu, 3 Aug 2023 15:16:07 +0800 Subject: [PATCH] feat: filter deal by piece --- cmd/droplet/run.go | 5 ++- retrievalprovider/httpretrieval/server.go | 38 ++++++++++++++++++- .../httpretrieval/server_test.go | 29 +++++++++++++- 3 files changed, 68 insertions(+), 4 deletions(-) diff --git a/cmd/droplet/run.go b/cmd/droplet/run.go index 7fada1fc..caef01eb 100644 --- a/cmd/droplet/run.go +++ b/cmd/droplet/run.go @@ -271,7 +271,10 @@ func runDaemon(cctx *cli.Context) error { if err = router.Handle("/resource", rpc.NewPieceStorageServer(resAPI.PieceStorageMgr)).GetError(); err != nil { return fmt.Errorf("handle 'resource' failed: %w", err) } - httpRetrievalServer := httpretrieval.NewServer(resAPI.PieceStorageMgr) + httpRetrievalServer, err := httpretrieval.NewServer(resAPI.PieceStorageMgr, resAPI) + if err != nil { + return err + } var iMarket marketapiV1.IMarketStruct permission.PermissionProxy(marketapiV1.IMarket(resAPI), &iMarket) diff --git a/retrievalprovider/httpretrieval/server.go b/retrievalprovider/httpretrieval/server.go index b2fa6dc0..3349f54d 100644 --- a/retrievalprovider/httpretrieval/server.go +++ b/retrievalprovider/httpretrieval/server.go @@ -1,6 +1,7 @@ package httpretrieval import ( + "context" "fmt" "io" "net/http" @@ -8,7 +9,10 @@ import ( "time" "github.com/NYTimes/gziphandler" + "github.com/filecoin-project/go-fil-markets/storagemarket" + marketAPI "github.com/filecoin-project/venus/venus-shared/api/market/v1" "github.com/filecoin-project/venus/venus-shared/types" + marketTypes "github.com/filecoin-project/venus/venus-shared/types/market" "github.com/ipfs-force-community/droplet/v2/piecestorage" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -19,10 +23,11 @@ var log = logging.Logger("httpserver") type Server struct { pieceMgr *piecestorage.PieceStorageManager + api marketAPI.IMarket } -func NewServer(pieceMgr *piecestorage.PieceStorageManager) *Server { - return &Server{pieceMgr: pieceMgr} +func NewServer(pieceMgr *piecestorage.PieceStorageManager, api marketAPI.IMarket) (*Server, error) { + return &Server{pieceMgr: pieceMgr, api: api}, nil } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -41,9 +46,20 @@ func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) { pieceCIDStr := pieceCID.String() log := log.With("piece cid", pieceCIDStr) log.Infof("start retrieval deal, Range: %s", r.Header.Get("Range")) + + _, err = s.listDealsByPiece(ctx, pieceCIDStr) + if err != nil { + log.Warn(err) + badResponse(w, http.StatusNotFound, err) + return + } + store, err := s.pieceMgr.FindStorageForRead(ctx, pieceCIDStr) if err != nil { log.Warn(err) + // if errors.Is(err, piecestorage.ErrorNotFoundForRead) { + // todo: unseal data + // } badResponse(w, http.StatusNotFound, err) return } @@ -67,6 +83,24 @@ func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) { log.Info("end retrieval deal") } +func (s *Server) listDealsByPiece(ctx context.Context, piece string) ([]marketTypes.MinerDeal, error) { + activeState := storagemarket.StorageDealActive + p := &marketTypes.StorageDealQueryParams{ + PieceCID: piece, + Page: marketTypes.Page{Limit: 100}, + State: &activeState, + } + deals, err := s.api.MarketListIncompleteDeals(ctx, p) + if err != nil { + return nil, err + } + if len(deals) == 0 { + return nil, fmt.Errorf("not found deal") + } + + return deals, nil +} + func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, log *zap.SugaredLogger) { // Set the Content-Type header explicitly so that http.ServeContent doesn't // try to do it implicitly diff --git a/retrievalprovider/httpretrieval/server_test.go b/retrievalprovider/httpretrieval/server_test.go index 85cab2fa..964c8635 100644 --- a/retrievalprovider/httpretrieval/server_test.go +++ b/retrievalprovider/httpretrieval/server_test.go @@ -13,9 +13,14 @@ import ( "testing" "time" + "github.com/filecoin-project/venus/venus-shared/api/market/v1/mock" + "github.com/filecoin-project/venus/venus-shared/types" + "github.com/filecoin-project/venus/venus-shared/types/market" + "github.com/golang/mock/gomock" "github.com/gorilla/mux" "github.com/ipfs-force-community/droplet/v2/config" "github.com/ipfs-force-community/droplet/v2/piecestorage" + "github.com/ipfs/go-cid" "github.com/stretchr/testify/assert" ) @@ -67,6 +72,8 @@ func TestRetrievalByPiece(t *testing.T) { assert.NoError(t, config.SaveConfig(cfg)) pieceStr := "baga6ea4seaqpzcr744w2rvqhkedfqbuqrbo7xtkde2ol6e26khu3wni64nbpaeq" + piece, err := cid.Decode(pieceStr) + assert.NoError(t, err) buf := &bytes.Buffer{} f, err := os.Create(filepath.Join(tmpDri, pieceStr)) assert.NoError(t, err) @@ -79,7 +86,18 @@ func TestRetrievalByPiece(t *testing.T) { pieceStorage, err := piecestorage.NewPieceStorageManager(&cfg.PieceStorage) assert.NoError(t, err) - s := NewServer(pieceStorage) + ctrl := gomock.NewController(t) + m := mock.NewMockIMarket(ctrl) + m.EXPECT().MarketListIncompleteDeals(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, p *market.StorageDealQueryParams) ([]market.MinerDeal, error) { + if p.PieceCID != pieceStr { + return nil, fmt.Errorf("not found deal") + } + return append([]market.MinerDeal{}, market.MinerDeal{ClientDealProposal: types.ClientDealProposal{Proposal: types.DealProposal{PieceCID: piece}}}), nil + }).AnyTimes() + + s, err := NewServer(pieceStorage, m) + assert.NoError(t, err) port := "34897" startHTTPServer(ctx, t, port, s) @@ -93,6 +111,15 @@ func TestRetrievalByPiece(t *testing.T) { data, err := io.ReadAll(resp.Body) assert.NoError(t, err) assert.Equal(t, buf.Bytes(), data) + + // deal not exist + url = fmt.Sprintf("http://127.0.0.1:%s/piece/%s", port, "bafybeiakou6e7hnx4ms2yangplzl6viapsoyo6phlee6bwrg4j2xt37m3q") + req, err = http.NewRequest(http.MethodGet, url, nil) + assert.NoError(t, err) + resp, err = http.DefaultClient.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() // nolint + assert.Equal(t, http.StatusNotFound, resp.StatusCode) } func startHTTPServer(ctx context.Context, t *testing.T, port string, s *Server) {