diff --git a/shared_testutil/test_deal_funds.go b/shared_testutil/test_deal_funds.go index 9a1017a7..853dcb65 100644 --- a/shared_testutil/test_deal_funds.go +++ b/shared_testutil/test_deal_funds.go @@ -3,8 +3,6 @@ package shared_testutil import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" - - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" ) func NewTestDealFunds() *TestDealFunds { @@ -34,5 +32,3 @@ func (f *TestDealFunds) Release(amount abi.TokenAmount) (abi.TokenAmount, error) f.ReleaseCalls = append(f.ReleaseCalls, amount) return f.reserved, nil } - -var _ funds.DealFunds = &TestDealFunds{} diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index ca5d74e6..9833d8d5 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -34,7 +34,6 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/migrations" "github.com/filecoin-project/go-fil-markets/storagemarket/network" @@ -61,7 +60,6 @@ type Client struct { statemachines fsm.Group migrateStateMachines func(context.Context) error pollingInterval time.Duration - dealFunds funds.DealFunds unsubDataTransfer datatransfer.Unsubscribe } @@ -86,7 +84,6 @@ func NewClient( discovery *discoveryimpl.Local, ds datastore.Batching, scn storagemarket.StorageClientNode, - dealFunds funds.DealFunds, options ...StorageClientOption, ) (*Client, error) { carIO := cario.NewCarIO() @@ -101,7 +98,6 @@ func NewClient( pubSub: pubsub.New(clientDispatcher), readySub: pubsub.New(shared.ReadyDispatcher), pollingInterval: DefaultPollingInterval, - dealFunds: dealFunds, } storageMigrations, err := migrations.ClientMigrations.Build() if err != nil { diff --git a/storagemarket/impl/client_environments.go b/storagemarket/impl/client_environments.go index 4ba8246d..2b2cb4a8 100644 --- a/storagemarket/impl/client_environments.go +++ b/storagemarket/impl/client_environments.go @@ -12,7 +12,6 @@ import ( "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -50,10 +49,6 @@ func (c *clientDealEnvironment) PollingInterval() time.Duration { return c.c.pollingInterval } -func (c *clientDealEnvironment) DealFunds() funds.DealFunds { - return c.c.dealFunds -} - type clientStoreGetter struct { c *Client } diff --git a/storagemarket/impl/client_test.go b/storagemarket/impl/client_test.go index bda3689b..0940c6d4 100644 --- a/storagemarket/impl/client_test.go +++ b/storagemarket/impl/client_test.go @@ -125,7 +125,6 @@ func TestClient_Migrations(t *testing.T) { deps.PeerResolver, clientDs, deps.ClientNode, - deps.ClientDealFunds, storageimpl.DealPollingInterval(0), ) require.NoError(t, err) diff --git a/storagemarket/impl/clientstates/client_states.go b/storagemarket/impl/clientstates/client_states.go index 6eef21a0..5bfdca94 100644 --- a/storagemarket/impl/clientstates/client_states.go +++ b/storagemarket/impl/clientstates/client_states.go @@ -17,7 +17,6 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -33,7 +32,6 @@ type ClientDealEnvironment interface { RestartDataTransfer(ctx context.Context, chid datatransfer.ChannelID) error GetProviderDealState(ctx context.Context, proposalCid cid.Cid) (*storagemarket.ProviderDealState, error) PollingInterval() time.Duration - DealFunds() funds.DealFunds network.PeerTagger } @@ -44,34 +42,15 @@ type ClientStateEntryFunc func(ctx fsm.Context, environment ClientDealEnvironmen func EnsureClientFunds(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { node := environment.Node() - tok, _, err := node.GetChainHead(ctx.Context()) - if err != nil { - return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("acquiring chain head: %w", err)) - } - - var requiredFunds abi.TokenAmount - if deal.FundsReserved.Nil() || deal.FundsReserved.IsZero() { - requiredFunds, err = environment.DealFunds().Reserve(deal.Proposal.ClientBalanceRequirement()) - if err != nil { - return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("tracking deal funds: %w", err)) - } - } else { - requiredFunds = environment.DealFunds().Get() - } - - _ = ctx.Trigger(storagemarket.ClientEventFundsReserved, deal.Proposal.ClientBalanceRequirement()) - - mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, requiredFunds, tok) - + mcid, err := node.ReserveFunds(ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, deal.Proposal.ClientBalanceRequirement()) if err != nil { return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, err) } - // if no message was sent, and there was no error, funds were already available if mcid == cid.Undef { return ctx.Trigger(storagemarket.ClientEventFundsEnsured) } - + // Otherwise wait for funds to be added return ctx.Trigger(storagemarket.ClientEventFundingInitiated, mcid) } @@ -234,14 +213,7 @@ func ValidateDealPublished(ctx fsm.Context, environment ClientDealEnvironment, d return ctx.Trigger(storagemarket.ClientEventDealPublishFailed, err) } - if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() { - _, err = environment.DealFunds().Release(deal.FundsReserved) - if err != nil { - // nonfatal error - log.Warnf("failed to release funds from local tracker: %s", err) - } - _ = ctx.Trigger(storagemarket.ClientEventFundsReleased, deal.FundsReserved) - } + releaseReservedFunds(ctx, environment, deal) // at this point data transfer is complete, so unprotect peer connection environment.UntagPeer(deal.Miner, deal.ProposalCid.String()) @@ -297,14 +269,7 @@ func WaitForDealCompletion(ctx fsm.Context, environment ClientDealEnvironment, d // FailDeal cleans up a failing deal func FailDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { - if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() { - _, err := environment.DealFunds().Release(deal.FundsReserved) - if err != nil { - // nonfatal error - log.Warnf("failed to release funds from local tracker: %s", err) - } - _ = ctx.Trigger(storagemarket.ClientEventFundsReleased, deal.FundsReserved) - } + releaseReservedFunds(ctx, environment, deal) // TODO: store in some sort of audit log log.Errorf("deal %s failed: %s", deal.ProposalCid, deal.Message) @@ -314,6 +279,17 @@ func FailDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagema return ctx.Trigger(storagemarket.ClientEventFailed) } +func releaseReservedFunds(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) { + if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() { + err := environment.Node().ReleaseFunds(ctx.Context(), deal.Proposal.Client, deal.FundsReserved) + if err != nil { + // nonfatal error + log.Warnf("failed to release funds: %s", err) + } + _ = ctx.Trigger(storagemarket.ClientEventFundsReleased, deal.FundsReserved) + } +} + func isAccepted(status storagemarket.StorageDealStatus) bool { return status == storagemarket.StorageDealStaged || status == storagemarket.StorageDealSealing || diff --git a/storagemarket/impl/clientstates/client_states_test.go b/storagemarket/impl/clientstates/client_states_test.go index 62a6c9b3..f904b9f8 100644 --- a/storagemarket/impl/clientstates/client_states_test.go +++ b/storagemarket/impl/clientstates/client_states_test.go @@ -24,7 +24,6 @@ import ( tut "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" ) @@ -36,8 +35,8 @@ func TestEnsureFunds(t *testing.T) { runAndInspect(t, storagemarket.StorageDealEnsureClientFunds, clientstates.EnsureClientFunds, testCase{ inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State) - assert.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement()) - assert.Len(t, env.dealFunds.ReleaseCalls, 0) + assert.Equal(t, env.node.DealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement()) + assert.Len(t, env.node.DealFunds.ReleaseCalls, 0) assert.Equal(t, deal.Proposal.ClientBalanceRequirement(), deal.FundsReserved) }, }) @@ -49,8 +48,8 @@ func TestEnsureFunds(t *testing.T) { }, inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFundsEnsured, deal.State) - assert.Len(t, env.dealFunds.ReserveCalls, 0) - assert.Len(t, env.dealFunds.ReleaseCalls, 0) + assert.Len(t, env.node.DealFunds.ReserveCalls, 0) + assert.Len(t, env.node.DealFunds.ReleaseCalls, 0) }, }) }) @@ -59,8 +58,8 @@ func TestEnsureFunds(t *testing.T) { nodeParams: nodeParams{AddFundsCid: tut.GenerateCids(1)[0]}, inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealClientFunding, deal.State) - assert.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement()) - assert.Len(t, env.dealFunds.ReleaseCalls, 0) + assert.Equal(t, env.node.DealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement()) + assert.Len(t, env.node.DealFunds.ReleaseCalls, 0) assert.Equal(t, deal.Proposal.ClientBalanceRequirement(), deal.FundsReserved) }, }) @@ -73,8 +72,8 @@ func TestEnsureFunds(t *testing.T) { inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) assert.Equal(t, "adding market funds failed: Something went wrong", deal.Message) - assert.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement()) - assert.Len(t, env.dealFunds.ReleaseCalls, 0) + assert.Equal(t, env.node.DealFunds.ReserveCalls[0], deal.Proposal.ClientBalanceRequirement()) + assert.Len(t, env.node.DealFunds.ReleaseCalls, 0) assert.Equal(t, deal.Proposal.ClientBalanceRequirement(), deal.FundsReserved) }, }) @@ -391,7 +390,7 @@ func TestValidateDealPublished(t *testing.T) { inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealSealing, deal.State) assert.Equal(t, abi.DealID(5), deal.DealID) - assert.Equal(t, env.dealFunds.ReleaseCalls[0], deal.Proposal.ClientBalanceRequirement()) + assert.Equal(t, env.node.DealFunds.ReleaseCalls[0], deal.Proposal.ClientBalanceRequirement()) assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero()) assert.Len(t, env.peerTagger.UntagCalls, 1) assert.Equal(t, deal.Miner, env.peerTagger.UntagCalls[0]) @@ -404,7 +403,7 @@ func TestValidateDealPublished(t *testing.T) { inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealSealing, deal.State) assert.Equal(t, abi.DealID(5), deal.DealID) - assert.Len(t, env.dealFunds.ReleaseCalls, 0) + assert.Len(t, env.node.DealFunds.ReleaseCalls, 0) assert.Len(t, env.peerTagger.UntagCalls, 1) assert.Equal(t, deal.Miner, env.peerTagger.UntagCalls[0]) }, @@ -508,7 +507,7 @@ func TestFailDeal(t *testing.T) { }, inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) - assert.Equal(t, env.dealFunds.ReleaseCalls[0], deal.Proposal.ClientBalanceRequirement()) + assert.Equal(t, env.node.DealFunds.ReleaseCalls[0], deal.Proposal.ClientBalanceRequirement()) assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero()) }, }) @@ -517,7 +516,7 @@ func TestFailDeal(t *testing.T) { runAndInspect(t, storagemarket.StorageDealFailing, clientstates.FailDeal, testCase{ inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) - assert.Len(t, env.dealFunds.ReleaseCalls, 0) + assert.Len(t, env.node.DealFunds.ReleaseCalls, 0) assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero()) }, }) @@ -578,7 +577,6 @@ func makeExecutor(ctx context.Context, providerDealState: envParams.providerDealState, getDealStatusErr: envParams.getDealStatusErr, pollingInterval: envParams.pollingInterval, - dealFunds: tut.NewTestDealFunds(), peerTagger: tut.NewTestPeerTagger(), } @@ -617,9 +615,10 @@ type nodeParams struct { OnDealSlashedEpoch abi.ChainEpoch } -func makeNode(params nodeParams) storagemarket.StorageClientNode { +func makeNode(params nodeParams) *testnodes.FakeClientNode { var out testnodes.FakeClientNode out.SMState = testnodes.NewStorageMarketState() + out.DealFunds = tut.NewTestDealFunds() out.AddFundsCid = params.AddFundsCid out.EnsureFundsError = params.EnsureFundsError out.VerifySignatureFails = params.VerifySignatureFails @@ -643,7 +642,7 @@ func makeNode(params nodeParams) storagemarket.StorageClientNode { } type fakeEnvironment struct { - node storagemarket.StorageClientNode + node *testnodes.FakeClientNode dealStream *tut.TestStorageDealStream startDataTransferChannelId datatransfer.ChannelID @@ -657,7 +656,6 @@ type fakeEnvironment struct { providerDealState *storagemarket.ProviderDealState getDealStatusErr error pollingInterval time.Duration - dealFunds *tut.TestDealFunds peerTagger *tut.TestPeerTagger } @@ -711,10 +709,6 @@ func (fe *fakeEnvironment) PollingInterval() time.Duration { return fe.pollingInterval } -func (fe *fakeEnvironment) DealFunds() funds.DealFunds { - return fe.dealFunds -} - func (fe *fakeEnvironment) TagPeer(id peer.ID, ident string) { fe.peerTagger.TagPeer(id, ident) } diff --git a/storagemarket/impl/funds/funds.go b/storagemarket/impl/funds/funds.go deleted file mode 100644 index 6def82a9..00000000 --- a/storagemarket/impl/funds/funds.go +++ /dev/null @@ -1,107 +0,0 @@ -package funds - -import ( - "bytes" - "sync" - - "github.com/ipfs/go-datastore" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/big" -) - -// DealFunds is used to track funds needed for (possibly multiple) deals in progress -type DealFunds interface { - // returns the current amount tracked - Get() abi.TokenAmount - - // Reserve is used to mark funds as "in-use" for a deal - // returns the new amount tracked - Reserve(amount abi.TokenAmount) (abi.TokenAmount, error) - - // Release releases reserved committed funds back to the available pool - // returns total amount reserved afterwards - Release(amount abi.TokenAmount) (abi.TokenAmount, error) -} - -type dealFundsImpl struct { - lock sync.Mutex - - // cached value - reserved abi.TokenAmount - - key datastore.Key - ds datastore.Batching -} - -func NewDealFunds(ds datastore.Batching, key datastore.Key) (DealFunds, error) { - df := &dealFundsImpl{ - ds: ds, - key: key, - } - - value, err := df.loadReserved() - if err != nil { - return nil, err - } - - df.reserved = value - - return df, nil -} - -func (f *dealFundsImpl) Get() abi.TokenAmount { - return f.reserved -} - -func (f *dealFundsImpl) Reserve(amount abi.TokenAmount) (abi.TokenAmount, error) { - f.lock.Lock() - defer f.lock.Unlock() - - return f.storeReserved(big.Add(f.reserved, amount)) -} - -func (f *dealFundsImpl) Release(amount abi.TokenAmount) (abi.TokenAmount, error) { - f.lock.Lock() - defer f.lock.Unlock() - - return f.storeReserved(big.Sub(f.reserved, amount)) -} - -// loadReserved will try to load our reserved value from the datastore -// if it cannot find our key, it will return zero -func (f *dealFundsImpl) loadReserved() (abi.TokenAmount, error) { - b, err := f.ds.Get(f.key) - if err != nil { - if xerrors.Is(err, datastore.ErrNotFound) { - f.reserved = big.Zero() - return f.reserved, nil - } - return abi.TokenAmount{}, err - } - - var value abi.TokenAmount - if err = value.UnmarshalCBOR(bytes.NewReader(b)); err != nil { - return abi.TokenAmount{}, err - } - - f.reserved = value - return f.reserved, nil -} - -// stores the new reserved value and returns it -func (f *dealFundsImpl) storeReserved(amount abi.TokenAmount) (abi.TokenAmount, error) { - var buf bytes.Buffer - err := amount.MarshalCBOR(&buf) - if err != nil { - return abi.TokenAmount{}, err - } - - if err := f.ds.Put(f.key, buf.Bytes()); err != nil { - return abi.TokenAmount{}, err - } - - f.reserved = amount - return f.reserved, nil -} diff --git a/storagemarket/impl/funds/funds_test.go b/storagemarket/impl/funds/funds_test.go deleted file mode 100644 index 42946721..00000000 --- a/storagemarket/impl/funds/funds_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package funds_test - -import ( - "testing" - - "github.com/ipfs/go-datastore" - dss "github.com/ipfs/go-datastore/sync" - "github.com/stretchr/testify/assert" - - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/big" - - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" -) - -func TestDealFunds(t *testing.T) { - ds := dss.MutexWrap(datastore.NewMapDatastore()) - key := datastore.NewKey("deal_funds_test") - - f, err := funds.NewDealFunds(ds, key) - assert.NoError(t, err) - - // initializes to zero - assert.Equal(t, f.Get(), big.Zero()) - - // reserve funds and return new total - newAmount, err := f.Reserve(abi.NewTokenAmount(123)) - assert.NoError(t, err) - assert.Equal(t, abi.NewTokenAmount(123), newAmount) - assert.Equal(t, abi.NewTokenAmount(123), f.Get()) - - // reserve more funds and return new total - newAmount, err = f.Reserve(abi.NewTokenAmount(100)) - assert.NoError(t, err) - assert.Equal(t, abi.NewTokenAmount(223), newAmount) - assert.Equal(t, abi.NewTokenAmount(223), f.Get()) - - // release funds and return new total - newAmount, err = f.Release(abi.NewTokenAmount(123)) - assert.NoError(t, err) - assert.Equal(t, abi.NewTokenAmount(100), newAmount) - assert.Equal(t, abi.NewTokenAmount(100), f.Get()) - - // creating new funds will read stored value - f, err = funds.NewDealFunds(ds, key) - assert.NoError(t, err) - assert.Equal(t, abi.NewTokenAmount(100), newAmount) - assert.Equal(t, abi.NewTokenAmount(100), f.Get()) -} diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index fff4d13a..a09c03a3 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -29,7 +29,6 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" @@ -59,7 +58,6 @@ type Provider struct { pieceStore piecestore.PieceStore conns *connmanager.ConnManager storedAsk StoredAsk - dealFunds funds.DealFunds actor address.Address dataTransfer datatransfer.Manager universalRetrievalEnabled bool @@ -111,7 +109,6 @@ func NewProvider(net network.StorageMarketNetwork, minerAddress address.Address, rt abi.RegisteredSealProof, storedAsk StoredAsk, - dealFunds funds.DealFunds, options ...StorageProviderOption, ) (storagemarket.StorageProvider, error) { carIO := cario.NewCarIO() @@ -127,7 +124,6 @@ func NewProvider(net network.StorageMarketNetwork, pieceStore: pieceStore, conns: connmanager.NewConnManager(), storedAsk: storedAsk, - dealFunds: dealFunds, actor: minerAddress, dataTransfer: dataTransfer, pubSub: pubsub.New(providerDispatcher), diff --git a/storagemarket/impl/provider_environments.go b/storagemarket/impl/provider_environments.go index 81d3e640..9255cc33 100644 --- a/storagemarket/impl/provider_environments.go +++ b/storagemarket/impl/provider_environments.go @@ -16,7 +16,6 @@ import ( "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/network" @@ -106,10 +105,6 @@ func (p *providerDealEnvironment) RunCustomDecisionLogic(ctx context.Context, de return p.p.customDealDeciderFunc(ctx, deal) } -func (p *providerDealEnvironment) DealFunds() funds.DealFunds { - return p.p.dealFunds -} - func (p *providerDealEnvironment) TagPeer(id peer.ID, s string) { p.p.net.TagPeer(id, s) } diff --git a/storagemarket/impl/provider_test.go b/storagemarket/impl/provider_test.go index 9cc40c7a..74255b88 100644 --- a/storagemarket/impl/provider_test.go +++ b/storagemarket/impl/provider_test.go @@ -138,7 +138,6 @@ func TestProvider_Migrations(t *testing.T) { deps.ProviderAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, deps.StoredAsk, - deps.ProviderDealFunds, ) require.NoError(t, err) @@ -231,7 +230,6 @@ func TestHandleDealStream(t *testing.T) { deps.ProviderAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, deps.StoredAsk, - deps.ProviderDealFunds, ) require.NoError(t, err) diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index ae2027c6..389993fd 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -25,7 +25,6 @@ import ( "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -49,7 +48,6 @@ type ProviderDealEnvironment interface { FileStore() filestore.FileStore PieceStore() piecestore.PieceStore RunCustomDecisionLogic(context.Context, storagemarket.MinerDeal) (bool, string, error) - DealFunds() funds.DealFunds network.PeerTagger } @@ -226,25 +224,13 @@ func EnsureProviderFunds(ctx fsm.Context, environment ProviderDealEnvironment, d if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("looking up miner worker: %w", err)) } - var requiredFunds abi.TokenAmount - if deal.FundsReserved.Nil() || deal.FundsReserved.IsZero() { - requiredFunds, err = environment.DealFunds().Reserve(deal.Proposal.ProviderCollateral) - if err != nil { - return ctx.Trigger(storagemarket.ProviderEventTrackFundsFailed, xerrors.Errorf("tracking deal funds: %w", err)) - } - } else { - requiredFunds = environment.DealFunds().Get() - } - - _ = ctx.Trigger(storagemarket.ProviderEventFundsReserved, deal.Proposal.ProviderCollateral) - - mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Provider, waddr, requiredFunds, tok) + mcid, err := node.ReserveFunds(ctx.Context(), waddr, deal.Proposal.Provider, deal.Proposal.ProviderCollateral) if err != nil { - return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("ensuring funds: %w", err)) + return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, err) } - // if no message was sent, and there was no error, it was instantaneous + // if no message was sent, and there was no error, funds were already available if mcid == cid.Undef { return ctx.Trigger(storagemarket.ProviderEventFunded) } @@ -324,14 +310,7 @@ func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal s return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err)) } - if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() { - _, err = environment.DealFunds().Release(deal.FundsReserved) - if err != nil { - // nonfatal error - log.Warnf("failed to release funds from local tracker: %s", err) - } - _ = ctx.Trigger(storagemarket.ProviderEventFundsReleased, deal.FundsReserved) - } + releaseReservedFunds(ctx, environment, deal) return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0], finalCid) }) @@ -520,14 +499,18 @@ func FailDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storage log.Warnf("deleting store id %d: %w", *deal.StoreID, err) } } + releaseReservedFunds(ctx, environment, deal) + + return ctx.Trigger(storagemarket.ProviderEventFailed) +} + +func releaseReservedFunds(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) { if !deal.FundsReserved.Nil() && !deal.FundsReserved.IsZero() { - _, err := environment.DealFunds().Release(deal.FundsReserved) + err := environment.Node().ReleaseFunds(ctx.Context(), deal.Proposal.Provider, deal.FundsReserved) if err != nil { // nonfatal error - log.Warnf("failed to release funds from local tracker: %s", err) + log.Warnf("failed to release funds: %s", err) } _ = ctx.Trigger(storagemarket.ProviderEventFundsReleased, deal.FundsReserved) } - - return ctx.Trigger(storagemarket.ProviderEventFailed) } diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index e0a536e7..d907b2b8 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -37,7 +37,6 @@ import ( tut "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" @@ -439,8 +438,8 @@ func TestEnsureProviderFunds(t *testing.T) { "succeeds immediately": { dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealPublish, deal.State) - require.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ProviderBalanceRequirement()) - require.Len(t, env.dealFunds.ReleaseCalls, 0) + require.Equal(t, env.node.DealFunds.ReserveCalls[0], deal.Proposal.ProviderBalanceRequirement()) + require.Len(t, env.node.DealFunds.ReleaseCalls, 0) require.Equal(t, deal.Proposal.ProviderBalanceRequirement(), deal.FundsReserved) }, }, @@ -450,8 +449,8 @@ func TestEnsureProviderFunds(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealPublish, deal.State) - require.Len(t, env.dealFunds.ReserveCalls, 0) - require.Len(t, env.dealFunds.ReleaseCalls, 0) + require.Len(t, env.node.DealFunds.ReserveCalls, 0) + require.Len(t, env.node.DealFunds.ReleaseCalls, 0) }, }, "succeeds by sending an AddBalance message": { @@ -464,8 +463,8 @@ func TestEnsureProviderFunds(t *testing.T) { dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealProviderFunding, deal.State) require.Equal(t, &cids[0], deal.AddFundsCid) - require.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ProviderBalanceRequirement()) - require.Len(t, env.dealFunds.ReleaseCalls, 0) + require.Equal(t, env.node.DealFunds.ReserveCalls[0], deal.Proposal.ProviderBalanceRequirement()) + require.Len(t, env.node.DealFunds.ReleaseCalls, 0) }, }, "get miner worker fails": { @@ -484,8 +483,8 @@ func TestEnsureProviderFunds(t *testing.T) { dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "error calling node: ensuring funds: not enough funds", deal.Message) - require.Equal(t, env.dealFunds.ReserveCalls[0], deal.Proposal.ProviderBalanceRequirement()) - require.Len(t, env.dealFunds.ReleaseCalls, 0) + require.Equal(t, env.node.DealFunds.ReserveCalls[0], deal.Proposal.ProviderBalanceRequirement()) + require.Len(t, env.node.DealFunds.ReleaseCalls, 0) }, }, } @@ -609,7 +608,7 @@ func TestWaitForPublish(t *testing.T) { dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealStaged, deal.State) require.Equal(t, expDealID, deal.DealID) - assert.Equal(t, env.dealFunds.ReleaseCalls[0], deal.Proposal.ProviderBalanceRequirement()) + assert.Equal(t, env.node.DealFunds.ReleaseCalls[0], deal.Proposal.ProviderBalanceRequirement()) assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero()) assert.Equal(t, deal.PublishCid, &finalCid) }, @@ -621,7 +620,7 @@ func TestWaitForPublish(t *testing.T) { dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealStaged, deal.State) require.Equal(t, expDealID, deal.DealID) - assert.Len(t, env.dealFunds.ReleaseCalls, 0) + assert.Len(t, env.node.DealFunds.ReleaseCalls, 0) }, }, "PublishStorageDeal errors": { @@ -997,7 +996,7 @@ func TestFailDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealError, deal.State) - assert.Equal(t, env.dealFunds.ReleaseCalls[0], deal.Proposal.ProviderBalanceRequirement()) + assert.Equal(t, env.node.DealFunds.ReleaseCalls[0], deal.Proposal.ProviderBalanceRequirement()) assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero()) }, }, @@ -1180,6 +1179,7 @@ func makeExecutor(ctx context.Context, common := testnodes.FakeCommonNode{ SMState: smstate, + DealFunds: tut.NewTestDealFunds(), GetChainHeadError: nodeParams.MostRecentStateIDError, GetBalanceError: nodeParams.ClientMarketBalanceError, VerifySignatureFails: nodeParams.VerifySignatureFails, @@ -1304,7 +1304,6 @@ func makeExecutor(ctx context.Context, deleteStoreError: params.DeleteStoreError, fs: fs, pieceStore: pieceStore, - dealFunds: tut.NewTestDealFunds(), peerTagger: tut.NewTestPeerTagger(), restartDataTransferError: params.RestartDataTransferError, @@ -1361,7 +1360,6 @@ type fakeEnvironment struct { pieceStore piecestore.PieceStore expectedTags map[string]struct{} receivedTags map[string]struct{} - dealFunds *tut.TestDealFunds peerTagger *tut.TestPeerTagger restartDataTransferCalls []restartDataTransferCall @@ -1418,10 +1416,6 @@ func (fe *fakeEnvironment) RunCustomDecisionLogic(context.Context, storagemarket return !fe.rejectDeal, fe.rejectReason, fe.decisionError } -func (fe *fakeEnvironment) DealFunds() funds.DealFunds { - return fe.dealFunds -} - func (fe *fakeEnvironment) TagPeer(id peer.ID, s string) { fe.peerTagger.TagPeer(id, s) } diff --git a/storagemarket/nodes.go b/storagemarket/nodes.go index de8ed0a4..430d6ff8 100644 --- a/storagemarket/nodes.go +++ b/storagemarket/nodes.go @@ -34,10 +34,11 @@ type StorageCommon interface { // Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) (cid.Cid, error) - // EnsureFunds ensures that a storage market participant has a certain amount of available funds - // If additional funds are needed, they will be sent from the 'wallet' address, and a cid for the - // corresponding chain message is returned - EnsureFunds(ctx context.Context, addr, wallet address.Address, amount abi.TokenAmount, tok shared.TipSetToken) (cid.Cid, error) + // ReserveFunds ensures that the given amount of funds is available for the deal + ReserveFunds(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) + + // ReleaseFunds releases funds reserved with ReserveFunds + ReleaseFunds(ctx context.Context, addr address.Address, amt abi.TokenAmount) error // GetBalance returns locked/unlocked for a storage participant. Used by both providers and clients. GetBalance(ctx context.Context, addr address.Address, tok shared.TipSetToken) (Balance, error) diff --git a/storagemarket/testharness/dependencies/dependencies.go b/storagemarket/testharness/dependencies/dependencies.go index b0e9ac8a..c53736fe 100644 --- a/storagemarket/testharness/dependencies/dependencies.go +++ b/storagemarket/testharness/dependencies/dependencies.go @@ -28,7 +28,6 @@ import ( piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl" "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" ) @@ -52,9 +51,7 @@ type StorageDependencies struct { ClientDelayFakeCommonNode testnodes.DelayFakeCommonNode ProviderClientDelayFakeCommonNode testnodes.DelayFakeCommonNode Fs filestore.FileStore - ClientDealFunds funds.DealFunds StoredAsk *storedask.StoredAsk - ProviderDealFunds funds.DealFunds } func NewDependenciesWithTestData(t *testing.T, ctx context.Context, td *shared_testutil.Libp2pTestData, smState *testnodes.StorageMarketState, tempPath string, @@ -69,7 +66,9 @@ func NewDependenciesWithTestData(t *testing.T, ctx context.Context, td *shared_t epoch := abi.ChainEpoch(100) clientNode := testnodes.FakeClientNode{ - FakeCommonNode: testnodes.FakeCommonNode{SMState: smState, + FakeCommonNode: testnodes.FakeCommonNode{ + SMState: smState, + DealFunds: shared_testutil.NewTestDealFunds(), DelayFakeCommonNode: cd}, ClientAddr: address.TestAddress, ExpectedMinerInfos: []address.Address{address.TestAddress2}, @@ -96,6 +95,7 @@ func NewDependenciesWithTestData(t *testing.T, ctx context.Context, td *shared_t FakeCommonNode: testnodes.FakeCommonNode{ DelayFakeCommonNode: pd, SMState: smState, + DealFunds: shared_testutil.NewTestDealFunds(), WaitForMessageRetBytes: psdReturnBytes.Bytes(), }, MinerAddr: providerAddr, @@ -111,10 +111,6 @@ func NewDependenciesWithTestData(t *testing.T, ctx context.Context, td *shared_t require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) - require.NoError(t, err) - clientDealFunds, err := funds.NewDealFunds(td.Ds1, datastore.NewKey("storage/client/dealfunds")) - require.NoError(t, err) - discovery, err := discoveryimpl.NewLocal(namespace.Wrap(td.Ds1, datastore.NewKey("/deals/local"))) require.NoError(t, err) shared_testutil.StartAndWaitForReady(ctx, t, discovery) @@ -128,8 +124,6 @@ func NewDependenciesWithTestData(t *testing.T, ctx context.Context, td *shared_t storedAskDs := namespace.Wrap(td.Ds2, datastore.NewKey("/storage/ask")) storedAsk, err := storedask.NewStoredAsk(storedAskDs, datastore.NewKey("latest-ask"), providerNode, providerAddr) assert.NoError(t, err) - providerDealFunds, err := funds.NewDealFunds(td.Ds2, datastore.NewKey("storage/provider/dealfunds")) - assert.NoError(t, err) // Closely follows the MinerInfo struct in the spec providerInfo := storagemarket.StorageProviderInfo{ @@ -159,8 +153,6 @@ func NewDependenciesWithTestData(t *testing.T, ctx context.Context, td *shared_t PeerResolver: discovery, PieceStore: ps, Fs: fs, - ClientDealFunds: clientDealFunds, StoredAsk: storedAsk, - ProviderDealFunds: providerDealFunds, } } diff --git a/storagemarket/testharness/testharness.go b/storagemarket/testharness/testharness.go index 5aca9e56..5ded6d78 100644 --- a/storagemarket/testharness/testharness.go +++ b/storagemarket/testharness/testharness.go @@ -74,7 +74,6 @@ func NewHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testut deps.PeerResolver, clientDs, deps.ClientNode, - deps.ClientDealFunds, storageimpl.DealPollingInterval(0), ) require.NoError(t, err) @@ -99,7 +98,6 @@ func NewHarnessWithTestData(t *testing.T, ctx context.Context, td *shared_testut deps.ProviderAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, deps.StoredAsk, - deps.ProviderDealFunds, ) assert.NoError(t, err) @@ -135,7 +133,6 @@ func (h *StorageHarness) CreateNewProvider(t *testing.T, ctx context.Context, td h.ProviderAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, h.StoredAsk, - h.ProviderDealFunds, ) require.NoError(t, err) h.Provider = provider diff --git a/storagemarket/testnodes/testnodes.go b/storagemarket/testnodes/testnodes.go index f7e8e11a..0631b481 100644 --- a/storagemarket/testnodes/testnodes.go +++ b/storagemarket/testnodes/testnodes.go @@ -74,6 +74,7 @@ func (sma *StorageMarketState) StateKey() (shared.TipSetToken, abi.ChainEpoch) { // where responses are stubbed type FakeCommonNode struct { SMState *StorageMarketState + DealFunds *shared_testutil.TestDealFunds AddFundsCid cid.Cid EnsureFundsError error VerifySignatureFails bool @@ -126,18 +127,25 @@ func (n *FakeCommonNode) AddFunds(ctx context.Context, addr address.Address, amo return n.AddFundsCid, nil } -// EnsureFunds adds funds to the given actor in the storage market state to ensure it has at least the given amount -func (n *FakeCommonNode) EnsureFunds(ctx context.Context, addr, wallet address.Address, amount abi.TokenAmount, tok shared.TipSetToken) (cid.Cid, error) { +// ReserveFunds reserves funds required for a deal with the storage market actor +func (n *FakeCommonNode) ReserveFunds(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) { + n.DealFunds.Reserve(amt) if n.EnsureFundsError == nil { balance := n.SMState.Balance(addr) - if balance.Available.LessThan(amount) { - return n.AddFunds(ctx, addr, big.Sub(amount, balance.Available)) + if balance.Available.LessThan(amt) { + return n.AddFunds(ctx, addr, big.Sub(amt, balance.Available)) } } return cid.Undef, n.EnsureFundsError } +// ReleaseFunds releases funds reserved with ReserveFunds +func (n *FakeCommonNode) ReleaseFunds(ctx context.Context, addr address.Address, amt abi.TokenAmount) error { + n.DealFunds.Release(amt) + return nil +} + // WaitForMessage simulates waiting for a message to appear on chain func (n *FakeCommonNode) WaitForMessage(ctx context.Context, mcid cid.Cid, onCompletion func(exitcode.ExitCode, []byte, cid.Cid, error) error) error { n.WaitForMessageCalls = append(n.WaitForMessageCalls, mcid)