Skip to content

Commit

Permalink
feat: 增加导入离线订单的接口和命令 / add interface and cli import offline deal (#176)
Browse files Browse the repository at this point in the history
* deat: add api to import offline deal
  • Loading branch information
LinZexiao authored Aug 3, 2022
1 parent c0d4f9c commit b6f003f
Show file tree
Hide file tree
Showing 8 changed files with 466 additions and 21 deletions.
4 changes: 4 additions & 0 deletions api/impl/venus_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,3 +908,7 @@ func (m MarketNodeImpl) RemovePieceStorage(ctx context.Context, name string) err

return m.Config.RemovePieceStorage(name)
}

func (m MarketNodeImpl) OfflineDealImport(ctx context.Context, deal types.MinerDeal) error {
return m.StorageProvider.ImportOfflineDeal(ctx, deal)
}
68 changes: 68 additions & 0 deletions cli/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package cli
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
Expand Down Expand Up @@ -323,6 +325,7 @@ var StorageDealsCmd = &cli.Command{
Usage: "Manage storage deals and related configuration",
Subcommands: []*cli.Command{
dealsImportDataCmd,
importOfflineDealCmd,
dealsListCmd,
updateStorageDealStateCmd,
storageDealSelectionCmd,
Expand Down Expand Up @@ -365,6 +368,71 @@ var dealsImportDataCmd = &cli.Command{
},
}

var importOfflineDealCmd = &cli.Command{
Name: "import-offlinedeal",
Usage: "Manually import offline deal",
ArgsUsage: "<deal_file_json>",
Flags: []cli.Flag{
// verbose
&cli.BoolFlag{
Name: "verbose",
Usage: "Print verbose output",
Aliases: []string{
"v",
},
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := NewMarketNode(cctx)
if err != nil {
return err
}
defer closer()

ctx := DaemonContext(cctx)

if cctx.Args().Len() < 1 {
return fmt.Errorf("must specify the path of json file which records the deal")
}

fpath := cctx.Args().Get(0)

dealbyte, err := ioutil.ReadFile(fpath)
if err != nil {
return fmt.Errorf("read deal file(%s) fail %w", fpath, err)
}

data := []market.MinerDeal{}
err = json.Unmarshal(dealbyte, &data)
if err != nil {
return fmt.Errorf("parse deal file(%s) fail %w", fpath, err)
}

totalCount := len(data)
importedCount := 0

// if verbose, print the deal info

for i := 0; i < totalCount; i++ {
err := api.OfflineDealImport(ctx, data[i])
if err != nil {
if cctx.Bool("verbose") {
fmt.Printf("( %d / %d ) %s : fail : %v\n", i+1, totalCount, data[i].ProposalCid, err)
}
} else {
importedCount++
if cctx.Bool("verbose") {
fmt.Printf("( %d / %d ) %s : success\n", i+1, totalCount, data[i].ProposalCid)
}
}
}

fmt.Printf("import %d deals, %d deal success , %d deal fail .\n", totalCount, importedCount, totalCount-importedCount)

return nil
},
}

var dealsListCmd = &cli.Command{
Name: "list",
Usage: "List all deals for this miner",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ require (
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/specs-actors/v8 v8.0.1
github.com/filecoin-project/specs-storage v0.4.1
github.com/filecoin-project/venus v1.6.1-0.20220727093750-977b92ef15ca
github.com/filecoin-project/venus v1.6.1-0.20220728083125-a81e7ae3dc2a
github.com/filecoin-project/venus-auth v1.6.0
github.com/filecoin-project/venus-messager v1.6.2-0.20220722073101-cc0865f9be71
github.com/gbrlsnchs/jwt/v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7wa
github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0=
github.com/filecoin-project/venus v1.6.0/go.mod h1:ukA+xwqDs40lixoa+HDNfuN8b1G4jpm4k0ujceVejSk=
github.com/filecoin-project/venus v1.6.1-0.20220718091042-a51da69e1731/go.mod h1:ukA+xwqDs40lixoa+HDNfuN8b1G4jpm4k0ujceVejSk=
github.com/filecoin-project/venus v1.6.1-0.20220727093750-977b92ef15ca h1:8RxpfBkpMDjMbW2myZf9t+ECJo9o3gPWQBbEL+YSAYc=
github.com/filecoin-project/venus v1.6.1-0.20220727093750-977b92ef15ca/go.mod h1:Ibaxk3dWQuVjrwm656L8jogcv0heFsoNhlqT6MWX9Cg=
github.com/filecoin-project/venus v1.6.1-0.20220728083125-a81e7ae3dc2a h1:0Rf1ttMCuJDaD0vhxUargjFawP9syas10j6Th+lBDgE=
github.com/filecoin-project/venus v1.6.1-0.20220728083125-a81e7ae3dc2a/go.mod h1:Ibaxk3dWQuVjrwm656L8jogcv0heFsoNhlqT6MWX9Cg=
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.6.0 h1:DLl7q5g1eh6UTpp98MLpRWAI79k6TUw1Myh/RLeaFpU=
github.com/filecoin-project/venus-auth v1.6.0/go.mod h1:x/Cv3zz9z5O+/uqIKgYtk5UsL7nYu+CtiPjyVQ8Lywg=
Expand Down
41 changes: 41 additions & 0 deletions storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"time"

"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils"
"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -66,6 +67,9 @@ type StorageProvider interface {
//ImportPublishedDeal manually import published deals to storage deals
ImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error

//ImportOfflineDeal manually import offline deals to storage deals
ImportOfflineDeal(ctx context.Context, deal types.MinerDeal) error

// SubscribeToEvents listens for events that happen related to storage deals on a provider
SubscribeToEvents(subscriber storagemarket.ProviderSubscriber) shared.Unsubscribe
}
Expand Down Expand Up @@ -438,6 +442,43 @@ func (p *StorageProviderImpl) ImportPublishedDeal(ctx context.Context, deal type
return p.dealStore.SaveDeal(ctx, improtDeal)
}

//ImportPublishedDeal manually import published deals for an storage deal
func (p *StorageProviderImpl) ImportOfflineDeal(ctx context.Context, deal types.MinerDeal) error {
// check deal state
if deal.State != storagemarket.StorageDealWaitingForData {
return fmt.Errorf("deal state %s not match %s", storagemarket.DealStates[deal.State], storagemarket.DealStates[storagemarket.StorageDealWaitingForData])
}

//check if miner exit
if !p.minerMgr.Has(ctx, deal.Proposal.Provider) {
return fmt.Errorf("miner %s not support", deal.Proposal.Provider)
}

//check if local exit the deal
if _, err := p.dealStore.GetDeal(ctx, deal.ProposalCid); err == nil {
return fmt.Errorf("deal exist proposal cid %s id %d", deal.ProposalCid, deal.DealID)
} else if !errors.Is(err, repo.ErrNotFound) {
return err
}

// check client signature
tok, _, err := p.spn.GetChainHead(ctx)
if err != nil {
return fmt.Errorf("node error getting most recent state id: %w", err)
}

if err := providerutils.VerifyProposal(ctx, deal.ClientDealProposal, tok, p.spn.VerifySignature); err != nil {
return fmt.Errorf("verifying StorageDealProposal: %w", err)
}

err = p.dealStore.SaveDeal(ctx, &deal)
if err != nil {
return fmt.Errorf("save miner deal to database %w", err)
}

return nil
}

// AddStorageCollateral adds storage collateral
func (p *StorageProviderImpl) AddStorageCollateral(ctx context.Context, mAddr address.Address, amount abi.TokenAmount) error {
done := make(chan error, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,28 @@ import (
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/venus-market/v2/models"
"github.com/filecoin-project/venus-market/v2/models/badger"
"github.com/filecoin-project/venus-market/v2/storageprovider"
"github.com/filecoin-project/venus-market/v2/utils/test_helper"
"github.com/stretchr/testify/require"
)

// go test -v ./storageadapter -test.run TestStorageAsk -mysql='root:ko2005@tcp(127.0.0.1:3306)/storage_market?charset=utf8mb4&parseTime=True&loc=Local&timeout=10s'

func TestStorageAsk(t *testing.T) {
ctx := context.Background()
t.Run("mysql", func(t *testing.T) {
mysqlAsk := &StorageAsk{repo: models.MysqlDB(t).StorageAskRepo(),
fullNode: test_helper.MockFullnode{T: t}}
mysqlAsk, _ := storageprovider.NewStorageAsk(ctx, models.MysqlDB(t), test_helper.MockFullnode{T: t})

testStorageAsk(t, mysqlAsk)
})
t.Run("badger", func(t *testing.T) {
badgerAsk := &StorageAsk{repo: badger.NewStorageAskRepo(models.BadgerDB(t)),
fullNode: test_helper.MockFullnode{T: t}}
badgerAsk, _ := storageprovider.NewStorageAsk(ctx, badger.NewBadgerRepo(badger.BadgerDSParams{AskDS: models.BadgerDB(t)}),
test_helper.MockFullnode{T: t})
testStorageAsk(t, badgerAsk)
})
}

func testStorageAsk(t *testing.T, repo *StorageAsk) {
func testStorageAsk(t *testing.T, repo storageprovider.IStorageAsk) {
ctx := context.Background()
miner, _ := address.NewFromString("f02438")
price := abi.NewTokenAmount(100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/filecoin-project/venus-market/v2/models"
network2 "github.com/filecoin-project/venus-market/v2/network"
"github.com/filecoin-project/venus-market/v2/piecestorage"
"github.com/filecoin-project/venus-market/v2/storageprovider"
vCrypto "github.com/filecoin-project/venus/pkg/crypto"
_ "github.com/filecoin-project/venus/pkg/crypto/bls"
_ "github.com/filecoin-project/venus/pkg/crypto/secp"
"github.com/filecoin-project/venus/venus-shared/api/chain/v1/mock"
"github.com/filecoin-project/venus/venus-shared/types"
marketypes "github.com/filecoin-project/venus/venus-shared/types/market"
Expand All @@ -30,14 +34,30 @@ import (
//go:embed testdata/import_deal.json
var importDataJsonString []byte

type dealCase struct {
type publishdealCase struct {
Proposal *marketypes.MinerDeal
Result bool
}

type importOfflineDealResult int

const (
success importOfflineDealResult = iota
dealExists
minerNotFound
signatureInvalid
dealStatusInvalid
)

type offlinedealCase struct {
Proposal *marketypes.MinerDeal
Result importOfflineDealResult
}

type testGroud struct {
DealsOnChain map[abi.DealID]types.MarketDeal
Cases []*dealCase
DealsOnChain map[abi.DealID]types.MarketDeal
PublishDealCases []*publishdealCase
OfflineDealCase []*offlinedealCase
}

func TestStorageProviderImpl_ImportPublishedDeal(t *testing.T) {
Expand All @@ -49,13 +69,44 @@ func TestStorageProviderImpl_ImportPublishedDeal(t *testing.T) {
if err != nil {
t.Error(err)
}
for _, c := range testGround.Cases {
for _, c := range testGround.PublishDealCases {
err = provider.ImportPublishedDeal(ctx, *c.Proposal)
assert.Equal(t, c.Result, err == nil, "ProposalCid: %v, err: %v", c.Proposal.ProposalCid, err)
}
}

func setup(t *testing.T) StorageProvider {
func TestStorageProviderImpl_ImportOfflineDeal(t *testing.T) {
provider := setup(t)
ctx := context.Background()

var testGround testGroud
err := json.Unmarshal(importDataJsonString, &testGround)
if err != nil {
t.Error(err)
}
for _, c := range testGround.OfflineDealCase {
err = provider.ImportOfflineDeal(ctx, *c.Proposal)
switch c.Result {
case success:
assert.NoError(t, err)
t.Log("Success")
case dealExists:
assert.Contains(t, err.Error(), "deal exist")
t.Logf("DealExists: %v", err)
case minerNotFound:
assert.Contains(t, err.Error(), fmt.Sprintf("miner %s not support", c.Proposal.Proposal.Provider.String()))
t.Logf("MinerNotFound: %v", err)
case signatureInvalid:
assert.Contains(t, err.Error(), "verifying StorageDealProposal")
t.Logf("SignatureInvalid: %v", err)
case dealStatusInvalid:
assert.Contains(t, err.Error(), "deal state")
t.Logf("DealStatusInvalid: %v", err)
}
}
}

func setup(t *testing.T) storageprovider.StorageProvider {
ctx := context.Background()
spn := newMockProviderNode()

Expand All @@ -70,7 +121,7 @@ func setup(t *testing.T) StorageProvider {
}

r := models.NewInMemoryRepo()
ask := &StorageAsk{r.StorageAskRepo(), spn}
ask, _ := storageprovider.NewStorageAsk(ctx, r, spn)
h, err := network2.MockHost(ctx)
if err != nil {
t.Error(err)
Expand All @@ -87,7 +138,7 @@ func setup(t *testing.T) StorageProvider {
addrMgr := mockAddrMgr{}

//todo how to mock dagstore
provider, err := NewStorageProvider(ask, h, config.DefaultMarketConfig, &homeDir, psManager, dt, spn, nil, r, addrMgr, nil)
provider, err := storageprovider.NewStorageProvider(ask, h, config.DefaultMarketConfig, &homeDir, psManager, dt, spn, nil, r, addrMgr, nil)
if err != nil {
t.Error(err)
}
Expand All @@ -98,7 +149,7 @@ type mockAddrMgr struct {
}

func (m mockAddrMgr) Has(ctx context.Context, addr address.Address) bool {
return addr.String() == "t01043"
return addr.String() == "t01043" || addr.String() == "t010938"
}

func (m mockAddrMgr) ActorAddress(ctx context.Context) ([]address.Address, error) {
Expand Down Expand Up @@ -167,8 +218,12 @@ func (m *mockProviderNode) SignWithGivenMiner(mAddr address.Address) network.Res
}

func (m *mockProviderNode) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
//TODO implement me
panic("implement me")
head, err := m.ChainHead(ctx)
if err != nil {
return nil, 0, err
}

return head.Key().Bytes(), head.Height(), nil
}

func (m *mockProviderNode) AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) (cid.Cid, error) {
Expand All @@ -192,8 +247,8 @@ func (m *mockProviderNode) GetBalance(ctx context.Context, addr address.Address,
}

func (m *mockProviderNode) VerifySignature(ctx context.Context, signature crypto.Signature, signer address.Address, plaintext []byte, tok shared.TipSetToken) (bool, error) {
//TODO implement me
panic("implement me")
err := vCrypto.Verify(&signature, signer, plaintext)
return err == nil, err
}

func (m *mockProviderNode) WaitForMessage(ctx context.Context, mcid cid.Cid, onCompletion func(exitcode.ExitCode, []byte, cid.Cid, error) error) error {
Expand Down
Loading

0 comments on commit b6f003f

Please sign in to comment.