Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/tanlang/filtrate deal by deal state before assigned #202

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions models/badger/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,29 @@ func (sdr *storageDealRepo) GetDealsByPieceStatus(ctx context.Context, mAddr add
})
}

func (sdr *storageDealRepo) GetDealsByPieceStatusAndDealStatus(ctx context.Context, mAddr address.Address, pieceStatus types.PieceStatus, dealStatus ...storagemarket.StorageDealStatus) ([]*types.MinerDeal, error) {
var deals []*types.MinerDeal
dict := map[storagemarket.StorageDealStatus]struct{}{}
for _, status := range dealStatus {
dict[status] = struct{}{}
}

return deals, travelDeals(ctx, sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) {
if inDeal.PieceStatus != pieceStatus {
return
}
if _, ok := dict[inDeal.State]; !ok && len(dealStatus) != 0 {
return
}
if mAddr != address.Undef && inDeal.ClientDealProposal.Proposal.Provider != mAddr {
return
}

deals = append(deals, inDeal)
return false, nil
})
}

func (sdr *storageDealRepo) GetPieceSize(ctx context.Context, pieceCID cid.Cid) (uint64, abi.PaddedPieceSize, error) {
var deal *types.MinerDeal

Expand Down
26 changes: 19 additions & 7 deletions models/badger/storage_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,33 @@ func TestStorageDeal(t *testing.T) {
assert.Equal(t, dealCases[0], *res[0])
})

t.Run("GetDealsByPieceStatus", func(t *testing.T) {
t.Run("With Provider", func(t *testing.T) {
res, err := r.GetDealsByPieceStatus(ctx, dealCases[0].Proposal.Provider, dealCases[0].PieceStatus)
t.Run("GetDealsByPieceStatusAndDealStatus", func(t *testing.T) {
t.Run("With DealStatus", func(t *testing.T) {
res, err := r.GetDealsByPieceStatusAndDealStatus(ctx, dealCases[0].Proposal.Provider, dealCases[0].PieceStatus, dealCases[0].State)
assert.NoError(t, err)
assert.Equal(t, 1, len(res))
assert.Equal(t, dealCases[0], *res[0])
})

t.Run("With Provider", func(t *testing.T) {
res, err := r.GetDealsByPieceStatus(ctx, address.Undef, dealCases[0].PieceStatus)
t.Run("Without DealStatus", func(t *testing.T) {
res, err := r.GetDealsByPieceStatusAndDealStatus(ctx, dealCases[0].Proposal.Provider, dealCases[0].PieceStatus)
assert.NoError(t, err)
assert.Equal(t, 1, len(res))
assert.Equal(t, dealCases[0], *res[0])
})

t.Run("Without Provider", func(t *testing.T) {
res, err := r.GetDealsByPieceStatusAndDealStatus(ctx, address.Undef, dealCases[0].PieceStatus, dealCases[0].State)
assert.NoError(t, err)
assert.Equal(t, 1, len(res))
assert.Equal(t, dealCases[0], *res[0])
})

t.Run("Will Return None", func(t *testing.T) {
res, err := r.GetDealsByPieceStatusAndDealStatus(ctx, address.Undef, dealCases[0].PieceStatus, 0)
assert.NoError(t, err)
assert.Equal(t, 0, len(res))
})
})

t.Run("GetDealsByDataCidAndDealStatus", func(t *testing.T) {
Expand All @@ -123,14 +135,14 @@ func TestStorageDeal(t *testing.T) {

t.Run("GetDealByAddrAndStatus", func(t *testing.T) {
t.Run("With Provider", func(t *testing.T) {
res, err := r.GetDealByAddrAndStatus(ctx, dealCases[0].Proposal.Provider, dealCases[0].State, 0, 10)
res, err := r.GetDealByAddrAndStatus(ctx, dealCases[0].Proposal.Provider, dealCases[0].State)
assert.NoError(t, err)
assert.Equal(t, 1, len(res))
assert.Equal(t, dealCases[0], *res[0])
})

t.Run("Without Provider", func(t *testing.T) {
res, err := r.GetDealByAddrAndStatus(ctx, address.Undef, dealCases[0].State, 0, 10)
res, err := r.GetDealByAddrAndStatus(ctx, address.Undef, dealCases[0].State)
assert.NoError(t, err)
assert.Equal(t, 1, len(res))
assert.Equal(t, dealCases[0], *res[0])
Expand Down
5 changes: 4 additions & 1 deletion models/mysql/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,11 @@ func (sdr *storageDealRepo) GetDealByDealID(ctx context.Context, mAddr address.A
return toStorageDeal(dbDeal)
}

func (sdr *storageDealRepo) GetDealsByPieceStatus(ctx context.Context, mAddr address.Address, pieceStatus types.PieceStatus) ([]*types.MinerDeal, error) {
func (sdr *storageDealRepo) GetDealsByPieceStatusAndDealStatus(ctx context.Context, mAddr address.Address, pieceStatus types.PieceStatus, dealStatus ...storagemarket.StorageDealStatus) ([]*types.MinerDeal, error) {
query := sdr.WithContext(ctx).Table(storageDealTableName).Where("piece_status = ?", pieceStatus)
if len(dealStatus) > 0 {
query.Where("state in ?", dealStatus)
}
if mAddr != address.Undef {
query.Where("cdp_provider=?", DBAddress(mAddr).String())
}
Expand Down
43 changes: 31 additions & 12 deletions models/mysql/storage_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,26 +217,45 @@ func testGetGetDealByDealID(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) {
assert.Equal(t, deal, res)
}

func testGetDealsByPieceStatus(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) {
func testGetDealsByPieceStatusAndDealStatus(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) {
deal := storageDealCases[0]
dbDeal := dbStorageDealCases[0]

db, err := getMysqlDryrunDB()
assert.NoError(t, err)

rows, err := getFullRows(dbDeal)
assert.NoError(t, err)
t.Run("with deal status", func(t *testing.T) {
rows, err := getFullRows(dbDeal)
assert.NoError(t, err)
var md []storageDeal
sql, vars, err := getSQL(db.Table((&storageDeal{}).TableName()).Where("piece_status = ?", deal.PieceStatus).Where("state in ?", []storagemarket.StorageDealStatus{dbDeal.State}).Where("cdp_provider=?", DBAddress(deal.Proposal.Provider).String()).Find(&md))
assert.NoError(t, err)
assert.NotEqual(t, "", sql)

var md []storageDeal
sql, vars, err := getSQL(db.Table((&storageDeal{}).TableName()).Where("piece_status = ?", deal.PieceStatus).Where("cdp_provider=?", DBAddress(deal.Proposal.Provider).String()).Find(&md))
assert.NoError(t, err)
mock.ExpectQuery(regexp.QuoteMeta(sql)).WithArgs(vars...).WillReturnRows(rows)

mock.ExpectQuery(regexp.QuoteMeta(sql)).WithArgs(vars...).WillReturnRows(rows)
res, err := r.StorageDealRepo().GetDealsByPieceStatusAndDealStatus(context.Background(), deal.Proposal.Provider, deal.PieceStatus, deal.State)
assert.NoError(t, err)
assert.Equal(t, 1, len(res))
assert.Equal(t, deal, res[0])
})

t.Run("without deal status", func(t *testing.T) {
rows, err := getFullRows(dbDeal)
assert.NoError(t, err)
var md []storageDeal
sql, vars, err := getSQL(db.Table((&storageDeal{}).TableName()).Where("piece_status = ?", deal.PieceStatus).Where("cdp_provider=?", DBAddress(deal.Proposal.Provider).String()).Find(&md))
assert.NoError(t, err)
assert.NotEqual(t, "", sql)

mock.ExpectQuery(regexp.QuoteMeta(sql)).WithArgs(vars...).WillReturnRows(rows)

res, err := r.StorageDealRepo().GetDealsByPieceStatusAndDealStatus(context.Background(), deal.Proposal.Provider, deal.PieceStatus)
assert.NoError(t, err)
assert.Equal(t, 1, len(res))
assert.Equal(t, deal, res[0])
})

res, err := r.StorageDealRepo().GetDealsByPieceStatus(context.Background(), deal.Proposal.Provider, deal.PieceStatus)
assert.NoError(t, err)
assert.Equal(t, 1, len(res))
assert.Equal(t, deal, res[0])
}

func testUpdateDealStatus(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) {
Expand Down Expand Up @@ -428,7 +447,7 @@ func TestStorageDealRepo(t *testing.T) {

t.Run("mysql test GetDealByAddrAndStatus", wrapper(testGetDealByAddrAndStatus, r, mock))
t.Run("mysql test GetDealByDealID", wrapper(testGetGetDealByDealID, r, mock))
t.Run("mysql test GetDealsByPieceStatus", wrapper(testGetDealsByPieceStatus, r, mock))
t.Run("mysql test GetDealsByPieceStatus", wrapper(testGetDealsByPieceStatusAndDealStatus, r, mock))

t.Run("mysql test UpdateDealStatus", wrapper(testUpdateDealStatus, r, mock))
t.Run("mysql test ListDeal", wrapper(testListDeal, r, mock))
Expand Down
4 changes: 2 additions & 2 deletions models/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type StorageDealRepo interface {

//todo rename Getxxx to Listxxx if return deals list
GetDeals(ctx context.Context, mAddr address.Address, pageIndex, pageSize int) ([]*types.MinerDeal, error)
//GetDealsByPieceStatus list deals by providor and piece status, but if addr is Undef, only filter by piece status
GetDealsByPieceStatus(ctx context.Context, mAddr address.Address, pieceStatus types.PieceStatus) ([]*types.MinerDeal, error)
//GetDealsByPieceStatusAndDealStatus list deals by providor, piece status and deal status, but if addr is Undef, only filter by piece status
GetDealsByPieceStatusAndDealStatus(ctx context.Context, mAddr address.Address, pieceStatus types.PieceStatus, dealStatus ...storagemarket.StorageDealStatus) ([]*types.MinerDeal, error)
//GetDealsByDataCidAndDealStatus query deals from address data cid and deal status, if mAddr equal undef wont filter by address
GetDealsByDataCidAndDealStatus(ctx context.Context, mAddr address.Address, dataCid cid.Cid, pieceStatuss []types.PieceStatus) ([]*types.MinerDeal, error)
GetDealsByPieceCidAndStatus(ctx context.Context, piececid cid.Cid, statues ...storagemarket.StorageDealStatus) ([]*types.MinerDeal, error)
Expand Down
4 changes: 2 additions & 2 deletions models/storage_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ func testStorageDeal(t *testing.T, dealRepo repo.StorageDealRepo) {
assert.Len(t, deals, 1)
compareDeal(t, deals[0], deal2)

deals, err = dealRepo.GetDealsByPieceStatus(ctx, deal2.ClientDealProposal.Proposal.Provider, types.Proving)
deals, err = dealRepo.GetDealsByPieceStatusAndDealStatus(ctx, deal2.ClientDealProposal.Proposal.Provider, types.Proving)
assert.Nil(t, err)
assert.Len(t, deals, 1)
compareDeal(t, deals[0], deal2)

deals, err = dealRepo.GetDealsByPieceStatus(ctx, address.Undef, types.Proving)
deals, err = dealRepo.GetDealsByPieceStatusAndDealStatus(ctx, address.Undef, types.Proving)
assert.Nil(t, err)
assert.Len(t, deals, 2)

Expand Down
9 changes: 3 additions & 6 deletions storageprovider/deal_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"

Expand Down Expand Up @@ -145,7 +146,7 @@ func (ps *dealAssigner) GetUnPackedDeals(ctx context.Context, miner address.Addr
spec.MaxPiece = defaultMaxPiece
}

mds, err := ps.repo.StorageDealRepo().GetDealsByPieceStatus(ctx, miner, types.Undefine)
mds, err := ps.repo.StorageDealRepo().GetDealsByPieceStatusAndDealStatus(ctx, miner, types.Undefine, storagemarket.StorageDealAwaitingPreCommit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -197,18 +198,14 @@ func (ps *dealAssigner) AssignUnPackedDeals(ctx context.Context, sid abi.SectorI

// TODO: is this concurrent safe?
if err := ps.repo.Transaction(func(txRepo repo.TxRepo) error {
mds, err := txRepo.StorageDealRepo().GetDealsByPieceStatus(ctx, maddr, types.Undefine)
mds, err := txRepo.StorageDealRepo().GetDealsByPieceStatusAndDealStatus(ctx, maddr, types.Undefine, storagemarket.StorageDealAwaitingPreCommit)
if err != nil {
return err
}

var deals []*types.DealInfoIncludePath

for _, md := range mds {
// TODO: 要排除不可密封状态的订单?
if md.DealID == 0 || isTerminateState(md) {
continue
}

// 订单筛选和组合的逻辑完全由 pickAndAlign 完成
deals = append(deals, &types.DealInfoIncludePath{
Expand Down