diff --git a/cmd/droplet/run.go b/cmd/droplet/run.go index 7fada1fc..caef01eb 100644 --- a/cmd/droplet/run.go +++ b/cmd/droplet/run.go @@ -271,7 +271,10 @@ func runDaemon(cctx *cli.Context) error { if err = router.Handle("/resource", rpc.NewPieceStorageServer(resAPI.PieceStorageMgr)).GetError(); err != nil { return fmt.Errorf("handle 'resource' failed: %w", err) } - httpRetrievalServer := httpretrieval.NewServer(resAPI.PieceStorageMgr) + httpRetrievalServer, err := httpretrieval.NewServer(resAPI.PieceStorageMgr, resAPI) + if err != nil { + return err + } var iMarket marketapiV1.IMarketStruct permission.PermissionProxy(marketapiV1.IMarket(resAPI), &iMarket) diff --git a/models/badger/storage_deal.go b/models/badger/storage_deal.go index b7c50e08..49a696d1 100644 --- a/models/badger/storage_deal.go +++ b/models/badger/storage_deal.go @@ -261,6 +261,9 @@ func (sdr *storageDealRepo) ListDeal(ctx context.Context, params *types.StorageD deal.State == storagemarket.StorageDealExpired || deal.State == storagemarket.StorageDealError) { return false, nil } + if len(params.PieceCID) != 0 && deal.Proposal.PieceCID.String() != params.PieceCID { + return false, nil + } if count >= params.Offset && count < end { storageDeals = append(storageDeals, deal) } diff --git a/models/badger/storage_deal_test.go b/models/badger/storage_deal_test.go index ddbbb098..8916c861 100644 --- a/models/badger/storage_deal_test.go +++ b/models/badger/storage_deal_test.go @@ -299,6 +299,7 @@ func TestListDeal(t *testing.T) { ctx, r, dealCases := prepareStorageDealTest(t) peers := []peer.ID{peer.ID("1"), peer.ID("2")} + byPiece := make(map[string]int) miner := []address.Address{dealCases[0].Proposal.Provider, testutil.AddressProvider()(t)} states := []storagemarket.StorageDealStatus{ storagemarket.StorageDealAcceptWait, @@ -315,6 +316,7 @@ func TestListDeal(t *testing.T) { deal.State = states[i%4] err := r.SaveDeal(ctx, &deal) assert.NoError(t, err) + byPiece[deal.Proposal.PieceCID.String()]++ } // refresh UpdatedAt and CreationTime @@ -388,6 +390,19 @@ func TestListDeal(t *testing.T) { }) assert.NoError(t, err) assert.Len(t, deals, 2) + + // test piece + for piece, count := range byPiece { + deals, err = r.ListDeal(ctx, &markettypes.StorageDealQueryParams{ + Page: markettypes.Page{ + Limit: 100, + }, + PieceCID: piece, + }) + assert.NoError(t, err) + assert.Len(t, deals, count) + } + } func TestGetStoragePieceInfo(t *testing.T) { diff --git a/models/mysql/storage_deal.go b/models/mysql/storage_deal.go index bad4ef3a..3a0110a1 100644 --- a/models/mysql/storage_deal.go +++ b/models/mysql/storage_deal.go @@ -509,6 +509,9 @@ func (sdr *storageDealRepo) ListDeal(ctx context.Context, params *types.StorageD if params.State != nil { query.Where("state = ?", params.State) } + if len(params.PieceCID) != 0 { + query.Where("cdp_piece_cid = ?", params.PieceCID) + } if discardFailedDeal { states := []storagemarket.StorageDealStatus{storagemarket.StorageDealFailing, storagemarket.StorageDealExpired, storagemarket.StorageDealError, storagemarket.StorageDealSlashed} diff --git a/models/mysql/storage_deal_test.go b/models/mysql/storage_deal_test.go index 8477856a..84da5bfe 100644 --- a/models/mysql/storage_deal_test.go +++ b/models/mysql/storage_deal_test.go @@ -321,6 +321,17 @@ func TestListDeal(t *testing.T) { res, err = r.StorageDealRepo().ListDeal(ctx, &types.StorageDealQueryParams{Page: defPage, DiscardFailedDeal: true}) assert.NoError(t, err) assert.Len(t, res, 2) + + // test piece + piece := dbStorageDealCases[0].PieceCID.String() + rows, err = getFullRows(dbStorageDealCases[0]) + assert.NoError(t, err) + sql, vars, err = getSQL(newQuery().Where("cdp_piece_cid = ?", piece).Limit(caseCount).Find(&storageDeals)) + assert.NoError(t, err) + mock.ExpectQuery(regexp.QuoteMeta(sql)).WithArgs(vars...).WillReturnRows(rows) + res, err = r.StorageDealRepo().ListDeal(ctx, &types.StorageDealQueryParams{Page: defPage, PieceCID: piece}) + assert.NoError(t, err) + assert.Len(t, res, 1) } func TestListDealByAddr(t *testing.T) { diff --git a/retrievalprovider/httpretrieval/server.go b/retrievalprovider/httpretrieval/server.go index b2fa6dc0..3349f54d 100644 --- a/retrievalprovider/httpretrieval/server.go +++ b/retrievalprovider/httpretrieval/server.go @@ -1,6 +1,7 @@ package httpretrieval import ( + "context" "fmt" "io" "net/http" @@ -8,7 +9,10 @@ import ( "time" "github.com/NYTimes/gziphandler" + "github.com/filecoin-project/go-fil-markets/storagemarket" + marketAPI "github.com/filecoin-project/venus/venus-shared/api/market/v1" "github.com/filecoin-project/venus/venus-shared/types" + marketTypes "github.com/filecoin-project/venus/venus-shared/types/market" "github.com/ipfs-force-community/droplet/v2/piecestorage" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -19,10 +23,11 @@ var log = logging.Logger("httpserver") type Server struct { pieceMgr *piecestorage.PieceStorageManager + api marketAPI.IMarket } -func NewServer(pieceMgr *piecestorage.PieceStorageManager) *Server { - return &Server{pieceMgr: pieceMgr} +func NewServer(pieceMgr *piecestorage.PieceStorageManager, api marketAPI.IMarket) (*Server, error) { + return &Server{pieceMgr: pieceMgr, api: api}, nil } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -41,9 +46,20 @@ func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) { pieceCIDStr := pieceCID.String() log := log.With("piece cid", pieceCIDStr) log.Infof("start retrieval deal, Range: %s", r.Header.Get("Range")) + + _, err = s.listDealsByPiece(ctx, pieceCIDStr) + if err != nil { + log.Warn(err) + badResponse(w, http.StatusNotFound, err) + return + } + store, err := s.pieceMgr.FindStorageForRead(ctx, pieceCIDStr) if err != nil { log.Warn(err) + // if errors.Is(err, piecestorage.ErrorNotFoundForRead) { + // todo: unseal data + // } badResponse(w, http.StatusNotFound, err) return } @@ -67,6 +83,24 @@ func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) { log.Info("end retrieval deal") } +func (s *Server) listDealsByPiece(ctx context.Context, piece string) ([]marketTypes.MinerDeal, error) { + activeState := storagemarket.StorageDealActive + p := &marketTypes.StorageDealQueryParams{ + PieceCID: piece, + Page: marketTypes.Page{Limit: 100}, + State: &activeState, + } + deals, err := s.api.MarketListIncompleteDeals(ctx, p) + if err != nil { + return nil, err + } + if len(deals) == 0 { + return nil, fmt.Errorf("not found deal") + } + + return deals, nil +} + func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, log *zap.SugaredLogger) { // Set the Content-Type header explicitly so that http.ServeContent doesn't // try to do it implicitly diff --git a/retrievalprovider/httpretrieval/server_test.go b/retrievalprovider/httpretrieval/server_test.go index 85cab2fa..964c8635 100644 --- a/retrievalprovider/httpretrieval/server_test.go +++ b/retrievalprovider/httpretrieval/server_test.go @@ -13,9 +13,14 @@ import ( "testing" "time" + "github.com/filecoin-project/venus/venus-shared/api/market/v1/mock" + "github.com/filecoin-project/venus/venus-shared/types" + "github.com/filecoin-project/venus/venus-shared/types/market" + "github.com/golang/mock/gomock" "github.com/gorilla/mux" "github.com/ipfs-force-community/droplet/v2/config" "github.com/ipfs-force-community/droplet/v2/piecestorage" + "github.com/ipfs/go-cid" "github.com/stretchr/testify/assert" ) @@ -67,6 +72,8 @@ func TestRetrievalByPiece(t *testing.T) { assert.NoError(t, config.SaveConfig(cfg)) pieceStr := "baga6ea4seaqpzcr744w2rvqhkedfqbuqrbo7xtkde2ol6e26khu3wni64nbpaeq" + piece, err := cid.Decode(pieceStr) + assert.NoError(t, err) buf := &bytes.Buffer{} f, err := os.Create(filepath.Join(tmpDri, pieceStr)) assert.NoError(t, err) @@ -79,7 +86,18 @@ func TestRetrievalByPiece(t *testing.T) { pieceStorage, err := piecestorage.NewPieceStorageManager(&cfg.PieceStorage) assert.NoError(t, err) - s := NewServer(pieceStorage) + ctrl := gomock.NewController(t) + m := mock.NewMockIMarket(ctrl) + m.EXPECT().MarketListIncompleteDeals(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, p *market.StorageDealQueryParams) ([]market.MinerDeal, error) { + if p.PieceCID != pieceStr { + return nil, fmt.Errorf("not found deal") + } + return append([]market.MinerDeal{}, market.MinerDeal{ClientDealProposal: types.ClientDealProposal{Proposal: types.DealProposal{PieceCID: piece}}}), nil + }).AnyTimes() + + s, err := NewServer(pieceStorage, m) + assert.NoError(t, err) port := "34897" startHTTPServer(ctx, t, port, s) @@ -93,6 +111,15 @@ func TestRetrievalByPiece(t *testing.T) { data, err := io.ReadAll(resp.Body) assert.NoError(t, err) assert.Equal(t, buf.Bytes(), data) + + // deal not exist + url = fmt.Sprintf("http://127.0.0.1:%s/piece/%s", port, "bafybeiakou6e7hnx4ms2yangplzl6viapsoyo6phlee6bwrg4j2xt37m3q") + req, err = http.NewRequest(http.MethodGet, url, nil) + assert.NoError(t, err) + resp, err = http.DefaultClient.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() // nolint + assert.Equal(t, http.StatusNotFound, resp.StatusCode) } func startHTTPServer(ctx context.Context, t *testing.T, port string, s *Server) {