Skip to content

Commit

Permalink
Merge pull request #49 from filecoin-project/opt/refactor-travel-func
Browse files Browse the repository at this point in the history
refactor badger db travel function
  • Loading branch information
zl03jsj authored Nov 25, 2021
2 parents 013abd2 + 4a70c7e commit 454c5af
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 19 deletions.
47 changes: 28 additions & 19 deletions models/badger/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@ import (
"bytes"
"errors"
"github.com/filecoin-project/go-address"
cborrpc "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/venus-market/types"
"golang.org/x/xerrors"

cborrpc "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/venus-market/models/repo"
"github.com/filecoin-project/venus-market/types"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"golang.org/x/xerrors"
)

type storageDealRepo struct {
Expand Down Expand Up @@ -118,11 +117,11 @@ func (sdr *storageDealRepo) GetDealByAddrAndStatus(addr address.Address, status
var err error
if err = sdr.travelDeals(
func(deal *types.MinerDeal) (err error) {
if deal.ClientDealProposal.Proposal.Provider == addr && deal.State == status {
storageDeals = append(storageDeals, deal)
}
return nil
}); err != nil {
if deal.ClientDealProposal.Proposal.Provider == addr && deal.State == status {
storageDeals = append(storageDeals, deal)
}
return nil
}); err != nil {
if xerrors.Is(err, justWantStopTravelErr) {
return storageDeals, nil
}
Expand All @@ -143,7 +142,7 @@ func (sdr *storageDealRepo) UpdateDealStatus(proposalCid cid.Cid, status storage

func (sdr *storageDealRepo) ListDeal(miner address.Address) ([]*types.MinerDeal, error) {
storageDeals := make([]*types.MinerDeal, 0)
if err := sdr.travelDeals(func(deal *types.MinerDeal) (err error) {
if err := travelDeals(sdr.ds, func(deal *types.MinerDeal) (err error) {
if deal.ClientDealProposal.Proposal.Provider == miner {
storageDeals = append(storageDeals, deal)
}
Expand Down Expand Up @@ -223,14 +222,13 @@ var justWantStopTravelErr = errors.New("stop travel")
func (dsr *storageDealRepo) GetDealByDealID(mAddr address.Address, dealID abi.DealID) (*types.MinerDeal, error) {
var deal *types.MinerDeal
var err error
if err = dsr.travelDeals(
func(inDeal *types.MinerDeal) error {
if inDeal.ClientDealProposal.Proposal.Provider == mAddr && inDeal.DealID == dealID {
deal = inDeal
return xerrors.Errorf("find the deal, so stop:%w", justWantStopTravelErr)
}
return nil
}); err != nil {
if err = travelDeals(dsr.ds, func(inDeal *types.MinerDeal) (err error) {
if inDeal.ClientDealProposal.Proposal.Provider == mAddr && inDeal.DealID == dealID {
deal = inDeal
return xerrors.Errorf("find the deal, so stop:%w", justWantStopTravelErr)
}
return nil
}); err != nil {
if xerrors.Is(err, justWantStopTravelErr) {
return deal, nil
}
Expand All @@ -239,7 +237,7 @@ func (dsr *storageDealRepo) GetDealByDealID(mAddr address.Address, dealID abi.De
return nil, repo.ErrNotFound
}

func (dsr *storageDealRepo) GetDealsByPieceStatus(mAddr address.Address, pieceStatus string) ([]*types.MinerDeal, error) {
func (dsr *storageDealRepo) GetDealsByPieceStatusV0(mAddr address.Address, pieceStatus string) ([]*types.MinerDeal, error) {
var deals []*types.MinerDeal
var err error
if err = dsr.travelDeals(
Expand All @@ -254,3 +252,14 @@ func (dsr *storageDealRepo) GetDealsByPieceStatus(mAddr address.Address, pieceSt

return deals, nil
}

func (dsr *storageDealRepo) GetDealsByPieceStatus(mAddr address.Address, pieceStatus string) ([]*types.MinerDeal, error) {
var deals []*types.MinerDeal

return deals, travelDeals(dsr.ds, func(inDeal *types.MinerDeal) (err error) {
if inDeal.ClientDealProposal.Proposal.Provider == mAddr && inDeal.PieceStatus == pieceStatus {
deals = append(deals, inDeal)
}
return nil
})
}
64 changes: 64 additions & 0 deletions models/badger/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package badger

import (
"bytes"
cborrpc "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"reflect"
)

func checkCallbackAndGetParamType(i interface{}) (reflect.Type, error) {
t := reflect.TypeOf(i)
if t.Kind() != reflect.Func {
return nil, xerrors.Errorf("must be a function")
}
if t.NumIn() != 1 {
return nil, xerrors.Errorf("callback must and only have 1 param")
}
if t.NumOut() != 1 {
return nil, xerrors.Errorf("callback must and only have 1 return value")
}
in := t.In(0)
if !in.Implements(reflect.TypeOf((*cbg.CBORUnmarshaler)(nil)).Elem()) {
return nil, xerrors.Errorf("param must be a CBORUnmarshaler")
}
if !t.Out(0).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
return nil, xerrors.Errorf("return value must be an error interface")
}
return in.Elem(), nil
}

func travelDeals(ds datastore.Batching, callback interface{}) error {
instanceType, err := checkCallbackAndGetParamType(callback)
if err != nil {
return err
}

result, err := ds.Query(query.Query{})
if err != nil {
return err
}

defer result.Close() //nolint:errcheck

for res := range result.Next() {
if res.Error != nil {
return err
}
i := reflect.New(instanceType).Interface()
unmarshaler := i.(cbg.CBORUnmarshaler)
if err = cborrpc.ReadCborRPC(bytes.NewReader(res.Value), unmarshaler); err != nil {
return err
}
rets := reflect.ValueOf(callback).Call([]reflect.Value{
reflect.ValueOf(unmarshaler)})

if !rets[0].IsNil() {
return rets[0].Interface().(error)
}
}
return nil
}
1 change: 1 addition & 0 deletions models/storage_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestStorageDeal(t *testing.T) {
t.Run("MinerDealMarshal", testCborMarshal)

t.Run("mysql", func(t *testing.T) {
t.Skip("")
repo := MysqlDB(t)
dealRepo := repo.StorageDealRepo()
defer func() {
Expand Down

0 comments on commit 454c5af

Please sign in to comment.