Skip to content

Commit

Permalink
feat: filter deal by piece
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode committed Oct 9, 2023
1 parent ff86f92 commit f39a4d3
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 4 deletions.
5 changes: 4 additions & 1 deletion cmd/droplet/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ func runDaemon(cctx *cli.Context) error {
if err = router.Handle("/resource", rpc.NewPieceStorageServer(resAPI.PieceStorageMgr)).GetError(); err != nil {
return fmt.Errorf("handle 'resource' failed: %w", err)
}
httpRetrievalServer := httpretrieval.NewServer(resAPI.PieceStorageMgr)
httpRetrievalServer, err := httpretrieval.NewServer(resAPI.PieceStorageMgr, resAPI)
if err != nil {
return err
}

var iMarket marketapiV1.IMarketStruct
permission.PermissionProxy(marketapiV1.IMarket(resAPI), &iMarket)
Expand Down
38 changes: 36 additions & 2 deletions retrievalprovider/httpretrieval/server.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package httpretrieval

import (
"context"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/NYTimes/gziphandler"
"github.com/filecoin-project/go-fil-markets/storagemarket"
marketAPI "github.com/filecoin-project/venus/venus-shared/api/market/v1"
"github.com/filecoin-project/venus/venus-shared/types"
marketTypes "github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/ipfs-force-community/droplet/v2/piecestorage"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -19,10 +23,11 @@ var log = logging.Logger("httpserver")

type Server struct {
pieceMgr *piecestorage.PieceStorageManager
api marketAPI.IMarket
}

func NewServer(pieceMgr *piecestorage.PieceStorageManager) *Server {
return &Server{pieceMgr: pieceMgr}
func NewServer(pieceMgr *piecestorage.PieceStorageManager, api marketAPI.IMarket) (*Server, error) {
return &Server{pieceMgr: pieceMgr, api: api}, nil
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -41,9 +46,20 @@ func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) {
pieceCIDStr := pieceCID.String()
log := log.With("piece cid", pieceCIDStr)
log.Infof("start retrieval deal, Range: %s", r.Header.Get("Range"))

_, err = s.listDealsByPiece(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusNotFound, err)
return
}

store, err := s.pieceMgr.FindStorageForRead(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
// if errors.Is(err, piecestorage.ErrorNotFoundForRead) {
// todo: unseal data
// }
badResponse(w, http.StatusNotFound, err)
return
}
Expand All @@ -67,6 +83,24 @@ func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) {
log.Info("end retrieval deal")
}

func (s *Server) listDealsByPiece(ctx context.Context, piece string) ([]marketTypes.MinerDeal, error) {
activeState := storagemarket.StorageDealActive
p := &marketTypes.StorageDealQueryParams{
PieceCID: piece,
Page: marketTypes.Page{Limit: 100},
State: &activeState,
}
deals, err := s.api.MarketListIncompleteDeals(ctx, p)
if err != nil {
return nil, err
}
if len(deals) == 0 {
return nil, fmt.Errorf("not found deal")
}

return deals, nil
}

func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, log *zap.SugaredLogger) {
// Set the Content-Type header explicitly so that http.ServeContent doesn't
// try to do it implicitly
Expand Down
29 changes: 28 additions & 1 deletion retrievalprovider/httpretrieval/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ import (
"testing"
"time"

"github.com/filecoin-project/venus/venus-shared/api/market/v1/mock"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/golang/mock/gomock"
"github.com/gorilla/mux"
"github.com/ipfs-force-community/droplet/v2/config"
"github.com/ipfs-force-community/droplet/v2/piecestorage"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -67,6 +72,8 @@ func TestRetrievalByPiece(t *testing.T) {
assert.NoError(t, config.SaveConfig(cfg))

pieceStr := "baga6ea4seaqpzcr744w2rvqhkedfqbuqrbo7xtkde2ol6e26khu3wni64nbpaeq"
piece, err := cid.Decode(pieceStr)
assert.NoError(t, err)
buf := &bytes.Buffer{}
f, err := os.Create(filepath.Join(tmpDri, pieceStr))
assert.NoError(t, err)
Expand All @@ -79,7 +86,18 @@ func TestRetrievalByPiece(t *testing.T) {

pieceStorage, err := piecestorage.NewPieceStorageManager(&cfg.PieceStorage)
assert.NoError(t, err)
s := NewServer(pieceStorage)
ctrl := gomock.NewController(t)
m := mock.NewMockIMarket(ctrl)
m.EXPECT().MarketListIncompleteDeals(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, p *market.StorageDealQueryParams) ([]market.MinerDeal, error) {
if p.PieceCID != pieceStr {
return nil, fmt.Errorf("not found deal")
}
return append([]market.MinerDeal{}, market.MinerDeal{ClientDealProposal: types.ClientDealProposal{Proposal: types.DealProposal{PieceCID: piece}}}), nil
}).AnyTimes()

s, err := NewServer(pieceStorage, m)
assert.NoError(t, err)
port := "34897"
startHTTPServer(ctx, t, port, s)

Expand All @@ -93,6 +111,15 @@ func TestRetrievalByPiece(t *testing.T) {
data, err := io.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, buf.Bytes(), data)

// deal not exist
url = fmt.Sprintf("http://127.0.0.1:%s/piece/%s", port, "bafybeiakou6e7hnx4ms2yangplzl6viapsoyo6phlee6bwrg4j2xt37m3q")
req, err = http.NewRequest(http.MethodGet, url, nil)
assert.NoError(t, err)
resp, err = http.DefaultClient.Do(req)
assert.NoError(t, err)
defer resp.Body.Close() // nolint
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func startHTTPServer(ctx context.Context, t *testing.T, port string, s *Server) {
Expand Down

0 comments on commit f39a4d3

Please sign in to comment.