Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filter retrieval by deal piece #454

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/droplet/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ func runDaemon(cctx *cli.Context) error {
if err = router.Handle("/resource", rpc.NewPieceStorageServer(resAPI.PieceStorageMgr)).GetError(); err != nil {
return fmt.Errorf("handle 'resource' failed: %w", err)
}
httpRetrievalServer := httpretrieval.NewServer(resAPI.PieceStorageMgr)
httpRetrievalServer, err := httpretrieval.NewServer(resAPI.PieceStorageMgr, resAPI)
if err != nil {
return err
}

var iMarket marketapiV1.IMarketStruct
permission.PermissionProxy(marketapiV1.IMarket(resAPI), &iMarket)
Expand Down
3 changes: 3 additions & 0 deletions models/badger/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ func (sdr *storageDealRepo) ListDeal(ctx context.Context, params *types.StorageD
deal.State == storagemarket.StorageDealExpired || deal.State == storagemarket.StorageDealError) {
return false, nil
}
if len(params.PieceCID) != 0 && deal.Proposal.PieceCID.String() != params.PieceCID {
return false, nil
}
if count >= params.Offset && count < end {
storageDeals = append(storageDeals, deal)
}
Expand Down
15 changes: 15 additions & 0 deletions models/badger/storage_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func TestListDeal(t *testing.T) {
ctx, r, dealCases := prepareStorageDealTest(t)

peers := []peer.ID{peer.ID("1"), peer.ID("2")}
byPiece := make(map[string]int)
miner := []address.Address{dealCases[0].Proposal.Provider, testutil.AddressProvider()(t)}
states := []storagemarket.StorageDealStatus{
storagemarket.StorageDealAcceptWait,
Expand All @@ -315,6 +316,7 @@ func TestListDeal(t *testing.T) {
deal.State = states[i%4]
err := r.SaveDeal(ctx, &deal)
assert.NoError(t, err)
byPiece[deal.Proposal.PieceCID.String()]++
}

// refresh UpdatedAt and CreationTime
Expand Down Expand Up @@ -388,6 +390,19 @@ func TestListDeal(t *testing.T) {
})
assert.NoError(t, err)
assert.Len(t, deals, 2)

// test piece
for piece, count := range byPiece {
deals, err = r.ListDeal(ctx, &markettypes.StorageDealQueryParams{
Page: markettypes.Page{
Limit: 100,
},
PieceCID: piece,
})
assert.NoError(t, err)
assert.Len(t, deals, count)
}

}

func TestGetStoragePieceInfo(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions models/mysql/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ func (sdr *storageDealRepo) ListDeal(ctx context.Context, params *types.StorageD
if params.State != nil {
query.Where("state = ?", params.State)
}
if len(params.PieceCID) != 0 {
query.Where("cdp_piece_cid = ?", params.PieceCID)
}
if discardFailedDeal {
states := []storagemarket.StorageDealStatus{storagemarket.StorageDealFailing,
storagemarket.StorageDealExpired, storagemarket.StorageDealError, storagemarket.StorageDealSlashed}
Expand Down
11 changes: 11 additions & 0 deletions models/mysql/storage_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,17 @@ func TestListDeal(t *testing.T) {
res, err = r.StorageDealRepo().ListDeal(ctx, &types.StorageDealQueryParams{Page: defPage, DiscardFailedDeal: true})
assert.NoError(t, err)
assert.Len(t, res, 2)

// test piece
piece := dbStorageDealCases[0].PieceCID.String()
rows, err = getFullRows(dbStorageDealCases[0])
assert.NoError(t, err)
sql, vars, err = getSQL(newQuery().Where("cdp_piece_cid = ?", piece).Limit(caseCount).Find(&storageDeals))
assert.NoError(t, err)
mock.ExpectQuery(regexp.QuoteMeta(sql)).WithArgs(vars...).WillReturnRows(rows)
res, err = r.StorageDealRepo().ListDeal(ctx, &types.StorageDealQueryParams{Page: defPage, PieceCID: piece})
assert.NoError(t, err)
assert.Len(t, res, 1)
}

func TestListDealByAddr(t *testing.T) {
Expand Down
38 changes: 36 additions & 2 deletions retrievalprovider/httpretrieval/server.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package httpretrieval

import (
"context"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/NYTimes/gziphandler"
"github.com/filecoin-project/go-fil-markets/storagemarket"
marketAPI "github.com/filecoin-project/venus/venus-shared/api/market/v1"
"github.com/filecoin-project/venus/venus-shared/types"
marketTypes "github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/ipfs-force-community/droplet/v2/piecestorage"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -19,10 +23,11 @@ var log = logging.Logger("httpserver")

type Server struct {
pieceMgr *piecestorage.PieceStorageManager
api marketAPI.IMarket
}

func NewServer(pieceMgr *piecestorage.PieceStorageManager) *Server {
return &Server{pieceMgr: pieceMgr}
func NewServer(pieceMgr *piecestorage.PieceStorageManager, api marketAPI.IMarket) (*Server, error) {
return &Server{pieceMgr: pieceMgr, api: api}, nil
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -41,9 +46,20 @@ func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) {
pieceCIDStr := pieceCID.String()
log := log.With("piece cid", pieceCIDStr)
log.Infof("start retrieval deal, Range: %s", r.Header.Get("Range"))

_, err = s.listDealsByPiece(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusNotFound, err)
return
}

store, err := s.pieceMgr.FindStorageForRead(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
// if errors.Is(err, piecestorage.ErrorNotFoundForRead) {
// todo: unseal data
// }
badResponse(w, http.StatusNotFound, err)
return
}
Expand All @@ -67,6 +83,24 @@ func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) {
log.Info("end retrieval deal")
}

func (s *Server) listDealsByPiece(ctx context.Context, piece string) ([]marketTypes.MinerDeal, error) {
activeState := storagemarket.StorageDealActive
p := &marketTypes.StorageDealQueryParams{
PieceCID: piece,
Page: marketTypes.Page{Limit: 100},
State: &activeState,
}
deals, err := s.api.MarketListIncompleteDeals(ctx, p)
if err != nil {
return nil, err
}
if len(deals) == 0 {
return nil, fmt.Errorf("not found deal")
}

return deals, nil
}

func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, log *zap.SugaredLogger) {
// Set the Content-Type header explicitly so that http.ServeContent doesn't
// try to do it implicitly
Expand Down
29 changes: 28 additions & 1 deletion retrievalprovider/httpretrieval/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ import (
"testing"
"time"

"github.com/filecoin-project/venus/venus-shared/api/market/v1/mock"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/golang/mock/gomock"
"github.com/gorilla/mux"
"github.com/ipfs-force-community/droplet/v2/config"
"github.com/ipfs-force-community/droplet/v2/piecestorage"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -67,6 +72,8 @@ func TestRetrievalByPiece(t *testing.T) {
assert.NoError(t, config.SaveConfig(cfg))

pieceStr := "baga6ea4seaqpzcr744w2rvqhkedfqbuqrbo7xtkde2ol6e26khu3wni64nbpaeq"
piece, err := cid.Decode(pieceStr)
assert.NoError(t, err)
buf := &bytes.Buffer{}
f, err := os.Create(filepath.Join(tmpDri, pieceStr))
assert.NoError(t, err)
Expand All @@ -79,7 +86,18 @@ func TestRetrievalByPiece(t *testing.T) {

pieceStorage, err := piecestorage.NewPieceStorageManager(&cfg.PieceStorage)
assert.NoError(t, err)
s := NewServer(pieceStorage)
ctrl := gomock.NewController(t)
m := mock.NewMockIMarket(ctrl)
m.EXPECT().MarketListIncompleteDeals(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, p *market.StorageDealQueryParams) ([]market.MinerDeal, error) {
if p.PieceCID != pieceStr {
return nil, fmt.Errorf("not found deal")
}
return append([]market.MinerDeal{}, market.MinerDeal{ClientDealProposal: types.ClientDealProposal{Proposal: types.DealProposal{PieceCID: piece}}}), nil
}).AnyTimes()

s, err := NewServer(pieceStorage, m)
assert.NoError(t, err)
port := "34897"
startHTTPServer(ctx, t, port, s)

Expand All @@ -93,6 +111,15 @@ func TestRetrievalByPiece(t *testing.T) {
data, err := io.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, buf.Bytes(), data)

// deal not exist
url = fmt.Sprintf("http://127.0.0.1:%s/piece/%s", port, "bafybeiakou6e7hnx4ms2yangplzl6viapsoyo6phlee6bwrg4j2xt37m3q")
req, err = http.NewRequest(http.MethodGet, url, nil)
assert.NoError(t, err)
resp, err = http.DefaultClient.Do(req)
assert.NoError(t, err)
defer resp.Body.Close() // nolint
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func startHTTPServer(ctx context.Context, t *testing.T, port string, s *Server) {
Expand Down
Loading