Skip to content

Commit

Permalink
refactor: integrate new FundManager
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Nov 10, 2020
1 parent d8b8581 commit 0588c1e
Show file tree
Hide file tree
Showing 17 changed files with 75 additions and 311 deletions.
4 changes: 0 additions & 4 deletions shared_testutil/test_deal_funds.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package shared_testutil
import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
)

func NewTestDealFunds() *TestDealFunds {
Expand Down Expand Up @@ -34,5 +32,3 @@ func (f *TestDealFunds) Release(amount abi.TokenAmount) (abi.TokenAmount, error)
f.ReleaseCalls = append(f.ReleaseCalls, amount)
return f.reserved, nil
}

var _ funds.DealFunds = &TestDealFunds{}
4 changes: 0 additions & 4 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
"github.com/filecoin-project/go-fil-markets/storagemarket/migrations"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
Expand All @@ -61,7 +60,6 @@ type Client struct {
statemachines fsm.Group
migrateStateMachines func(context.Context) error
pollingInterval time.Duration
dealFunds funds.DealFunds

unsubDataTransfer datatransfer.Unsubscribe
}
Expand All @@ -86,7 +84,6 @@ func NewClient(
discovery *discoveryimpl.Local,
ds datastore.Batching,
scn storagemarket.StorageClientNode,
dealFunds funds.DealFunds,
options ...StorageClientOption,
) (*Client, error) {
carIO := cario.NewCarIO()
Expand All @@ -101,7 +98,6 @@ func NewClient(
pubSub: pubsub.New(clientDispatcher),
readySub: pubsub.New(shared.ReadyDispatcher),
pollingInterval: DefaultPollingInterval,
dealFunds: dealFunds,
}
storageMigrations, err := migrations.ClientMigrations.Build()
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions storagemarket/impl/client_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/filecoin-project/go-multistore"

"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

Expand Down Expand Up @@ -50,10 +49,6 @@ func (c *clientDealEnvironment) PollingInterval() time.Duration {
return c.c.pollingInterval
}

func (c *clientDealEnvironment) DealFunds() funds.DealFunds {
return c.c.dealFunds
}

type clientStoreGetter struct {
c *Client
}
Expand Down
1 change: 0 additions & 1 deletion storagemarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func TestClient_Migrations(t *testing.T) {
deps.PeerResolver,
clientDs,
deps.ClientNode,
deps.ClientDealFunds,
storageimpl.DealPollingInterval(0),
)
require.NoError(t, err)
Expand Down
54 changes: 15 additions & 39 deletions storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
)
Expand All @@ -33,7 +32,6 @@ type ClientDealEnvironment interface {
RestartDataTransfer(ctx context.Context, chid datatransfer.ChannelID) error
GetProviderDealState(ctx context.Context, proposalCid cid.Cid) (*storagemarket.ProviderDealState, error)
PollingInterval() time.Duration
DealFunds() funds.DealFunds
network.PeerTagger
}

Expand All @@ -44,34 +42,15 @@ type ClientStateEntryFunc func(ctx fsm.Context, environment ClientDealEnvironmen
func EnsureClientFunds(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {
node := environment.Node()

tok, _, err := node.GetChainHead(ctx.Context())
if err != nil {
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("acquiring chain head: %w", err))
}

var requiredFunds abi.TokenAmount
if deal.FundsReserved.Nil() || deal.FundsReserved.IsZero() {
requiredFunds, err = environment.DealFunds().Reserve(deal.Proposal.ClientBalanceRequirement())
if err != nil {
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("tracking deal funds: %w", err))
}
} else {
requiredFunds = environment.DealFunds().Get()
}

_ = ctx.Trigger(storagemarket.ClientEventFundsReserved, deal.Proposal.ClientBalanceRequirement())

mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, requiredFunds, tok)

mcid, err := node.ReserveFunds(ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, deal.Proposal.ClientBalanceRequirement())
if err != nil {
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, err)
}

// if no message was sent, and there was no error, funds were already available
if mcid == cid.Undef {
return ctx.Trigger(storagemarket.ClientEventFundsEnsured)
}

// Otherwise wait for funds to be added
return ctx.Trigger(storagemarket.ClientEventFundingInitiated, mcid)
}

Expand Down Expand Up @@ -234,14 +213,7 @@ func ValidateDealPublished(ctx fsm.Context, environment ClientDealEnvironment, d
return ctx.Trigger(storagemarket.ClientEventDealPublishFailed, err)
}

if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() {
_, err = environment.DealFunds().Release(deal.FundsReserved)
if err != nil {
// nonfatal error
log.Warnf("failed to release funds from local tracker: %s", err)
}
_ = ctx.Trigger(storagemarket.ClientEventFundsReleased, deal.FundsReserved)
}
releaseReservedFunds(ctx, environment, deal)

// at this point data transfer is complete, so unprotect peer connection
environment.UntagPeer(deal.Miner, deal.ProposalCid.String())
Expand Down Expand Up @@ -297,14 +269,7 @@ func WaitForDealCompletion(ctx fsm.Context, environment ClientDealEnvironment, d

// FailDeal cleans up a failing deal
func FailDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {
if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() {
_, err := environment.DealFunds().Release(deal.FundsReserved)
if err != nil {
// nonfatal error
log.Warnf("failed to release funds from local tracker: %s", err)
}
_ = ctx.Trigger(storagemarket.ClientEventFundsReleased, deal.FundsReserved)
}
releaseReservedFunds(ctx, environment, deal)

// TODO: store in some sort of audit log
log.Errorf("deal %s failed: %s", deal.ProposalCid, deal.Message)
Expand All @@ -314,6 +279,17 @@ func FailDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagema
return ctx.Trigger(storagemarket.ClientEventFailed)
}

func releaseReservedFunds(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) {
if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() {
err := environment.Node().ReleaseFunds(ctx.Context(), deal.Proposal.Client, deal.FundsReserved)
if err != nil {
// nonfatal error
log.Warnf("failed to release funds: %s", err)
}
_ = ctx.Trigger(storagemarket.ClientEventFundsReleased, deal.FundsReserved)
}
}

func isAccepted(status storagemarket.StorageDealStatus) bool {
return status == storagemarket.StorageDealStaged ||
status == storagemarket.StorageDealSealing ||
Expand Down
36 changes: 15 additions & 21 deletions storagemarket/impl/clientstates/client_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket/testnodes"
)
Expand All @@ -36,8 +35,8 @@ func TestEnsureFunds(t *testing.T) {
runAndInspect(t, storagemarket.StorageDealEnsureClientFunds, clientstates.EnsureClientFunds, testCase{
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State)
assert.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement())
assert.Len(t, env.dealFunds.ReleaseCalls, 0)
assert.Equal(t, env.node.DealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement())
assert.Len(t, env.node.DealFunds.ReleaseCalls, 0)
assert.Equal(t, deal.Proposal.ClientBalanceRequirement(), deal.FundsReserved)
},
})
Expand All @@ -49,8 +48,8 @@ func TestEnsureFunds(t *testing.T) {
},
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State)
assert.Len(t, env.dealFunds.ReserveCalls, 0)
assert.Len(t, env.dealFunds.ReleaseCalls, 0)
assert.Len(t, env.node.DealFunds.ReserveCalls, 0)
assert.Len(t, env.node.DealFunds.ReleaseCalls, 0)
},
})
})
Expand All @@ -59,8 +58,8 @@ func TestEnsureFunds(t *testing.T) {
nodeParams: nodeParams{AddFundsCid: tut.GenerateCids(1)[0]},
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealClientFunding, deal.State)
assert.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement())
assert.Len(t, env.dealFunds.ReleaseCalls, 0)
assert.Equal(t, env.node.DealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement())
assert.Len(t, env.node.DealFunds.ReleaseCalls, 0)
assert.Equal(t, deal.Proposal.ClientBalanceRequirement(), deal.FundsReserved)
},
})
Expand All @@ -73,8 +72,8 @@ func TestEnsureFunds(t *testing.T) {
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
assert.Equal(t, "adding market funds failed: Something went wrong", deal.Message)
assert.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement())
assert.Len(t, env.dealFunds.ReleaseCalls, 0)
assert.Equal(t, env.node.DealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement())
assert.Len(t, env.node.DealFunds.ReleaseCalls, 0)
assert.Equal(t, deal.Proposal.ClientBalanceRequirement(), deal.FundsReserved)
},
})
Expand Down Expand Up @@ -391,7 +390,7 @@ func TestValidateDealPublished(t *testing.T) {
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealSealing, deal.State)
assert.Equal(t, abi.DealID(5), deal.DealID)
assert.Equal(t, env.dealFunds.ReleaseCalls[0], deal.Proposal.ClientBalanceRequirement())
assert.Equal(t, env.node.DealFunds.ReleaseCalls[0], deal.Proposal.ClientBalanceRequirement())
assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero())
assert.Len(t, env.peerTagger.UntagCalls, 1)
assert.Equal(t, deal.Miner, env.peerTagger.UntagCalls[0])
Expand All @@ -404,7 +403,7 @@ func TestValidateDealPublished(t *testing.T) {
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealSealing, deal.State)
assert.Equal(t, abi.DealID(5), deal.DealID)
assert.Len(t, env.dealFunds.ReleaseCalls, 0)
assert.Len(t, env.node.DealFunds.ReleaseCalls, 0)
assert.Len(t, env.peerTagger.UntagCalls, 1)
assert.Equal(t, deal.Miner, env.peerTagger.UntagCalls[0])
},
Expand Down Expand Up @@ -508,7 +507,7 @@ func TestFailDeal(t *testing.T) {
},
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealError, deal.State)
assert.Equal(t, env.dealFunds.ReleaseCalls[0], deal.Proposal.ClientBalanceRequirement())
assert.Equal(t, env.node.DealFunds.ReleaseCalls[0], deal.Proposal.ClientBalanceRequirement())
assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero())
},
})
Expand All @@ -517,7 +516,7 @@ func TestFailDeal(t *testing.T) {
runAndInspect(t, storagemarket.StorageDealFailing, clientstates.FailDeal, testCase{
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealError, deal.State)
assert.Len(t, env.dealFunds.ReleaseCalls, 0)
assert.Len(t, env.node.DealFunds.ReleaseCalls, 0)
assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero())
},
})
Expand Down Expand Up @@ -578,7 +577,6 @@ func makeExecutor(ctx context.Context,
providerDealState: envParams.providerDealState,
getDealStatusErr: envParams.getDealStatusErr,
pollingInterval: envParams.pollingInterval,
dealFunds: tut.NewTestDealFunds(),
peerTagger: tut.NewTestPeerTagger(),
}

Expand Down Expand Up @@ -617,9 +615,10 @@ type nodeParams struct {
OnDealSlashedEpoch abi.ChainEpoch
}

func makeNode(params nodeParams) storagemarket.StorageClientNode {
func makeNode(params nodeParams) *testnodes.FakeClientNode {
var out testnodes.FakeClientNode
out.SMState = testnodes.NewStorageMarketState()
out.DealFunds = tut.NewTestDealFunds()
out.AddFundsCid = params.AddFundsCid
out.EnsureFundsError = params.EnsureFundsError
out.VerifySignatureFails = params.VerifySignatureFails
Expand All @@ -643,7 +642,7 @@ func makeNode(params nodeParams) storagemarket.StorageClientNode {
}

type fakeEnvironment struct {
node storagemarket.StorageClientNode
node *testnodes.FakeClientNode
dealStream *tut.TestStorageDealStream

startDataTransferChannelId datatransfer.ChannelID
Expand All @@ -657,7 +656,6 @@ type fakeEnvironment struct {
providerDealState *storagemarket.ProviderDealState
getDealStatusErr error
pollingInterval time.Duration
dealFunds *tut.TestDealFunds
peerTagger *tut.TestPeerTagger
}

Expand Down Expand Up @@ -711,10 +709,6 @@ func (fe *fakeEnvironment) PollingInterval() time.Duration {
return fe.pollingInterval
}

func (fe *fakeEnvironment) DealFunds() funds.DealFunds {
return fe.dealFunds
}

func (fe *fakeEnvironment) TagPeer(id peer.ID, ident string) {
fe.peerTagger.TagPeer(id, ident)
}
Expand Down
Loading

0 comments on commit 0588c1e

Please sign in to comment.