From 3e7b8cb41b8d7a4375f73c346fc3cfc19fd548d2 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Tue, 5 May 2020 13:50:38 -0700 Subject: [PATCH] Storage Market Changes Based On Lotus Integration (#223) * chore(deps): update go-ipfs-ds-help update go-ipfs-ds-help and go-ipfs-blockstore to match lotus * feat(storagemarket): cleanup WaitForMessage Modify WaitForMessage signature -- as it turns out, we cannot wait for messages with confidences, only state changes (for the most part) * feat(storagemarket): add context to WaitForMessage add a context to WaitForMessage since generally wait apis on node expect a context --- go.mod | 4 +- go.sum | 4 ++ retrievalmarket/discovery/local.go | 4 +- storagemarket/impl/client.go | 10 +-- .../impl/clientstates/client_states.go | 12 ++-- storagemarket/impl/provider.go | 10 +-- .../impl/providerstates/provider_states.go | 62 ++++++++++--------- storagemarket/testnodes/testnodes.go | 13 ++-- storagemarket/types.go | 3 +- 9 files changed, 70 insertions(+), 52 deletions(-) diff --git a/go.mod b/go.mod index 91da9d3d..05ab89a8 100644 --- a/go.mod +++ b/go.mod @@ -19,10 +19,10 @@ require ( github.com/ipfs/go-cid v0.0.5 github.com/ipfs/go-datastore v0.4.4 github.com/ipfs/go-graphsync v0.0.6-0.20200504202014-9d5f2c26a103 - github.com/ipfs/go-ipfs-blockstore v0.1.4 + github.com/ipfs/go-ipfs-blockstore v1.0.0 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 - github.com/ipfs/go-ipfs-ds-help v0.1.1 + github.com/ipfs/go-ipfs-ds-help v1.0.0 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.8 github.com/ipfs/go-ipld-cbor v0.0.4 diff --git a/go.sum b/go.sum index 565378b1..27e060b7 100644 --- a/go.sum +++ b/go.sum @@ -287,6 +287,8 @@ github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2Is github.com/ipfs/go-ipfs-blockstore v0.1.1/go.mod h1:8gZOgIN5e+Xdg2YSGdwTTRbguSVjYyosIDRQCY8E9QM= github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ= github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ= +github.com/ipfs/go-ipfs-blockstore v1.0.0 h1:pmFp5sFYsYVvMOp9X01AK3s85usVcLvkBTRsN6SnfUA= +github.com/ipfs/go-ipfs-blockstore v1.0.0/go.mod h1:knLVdhVU9L7CC4T+T4nvGdeUIPAXlnd9zmXfp+9MIjU= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk= github.com/ipfs/go-ipfs-chunker v0.0.1 h1:cHUUxKFQ99pozdahi+uSC/3Y6HeRpi9oTeUHbE27SEw= @@ -300,6 +302,8 @@ github.com/ipfs/go-ipfs-ds-help v0.0.1 h1:QBg+Ts2zgeemK/dB0saiF/ykzRGgfoFMT90Rzo github.com/ipfs/go-ipfs-ds-help v0.0.1/go.mod h1:gtP9xRaZXqIQRh1HRpp595KbBEdgqWFxefeVKOV8sxo= github.com/ipfs/go-ipfs-ds-help v0.1.1 h1:IW/bXGeaAZV2VH0Kuok+Ohva/zHkHmeLFBxC1k7mNPc= github.com/ipfs/go-ipfs-ds-help v0.1.1/go.mod h1:SbBafGJuGsPI/QL3j9Fc5YPLeAu+SzOkI0gFwAg+mOs= +github.com/ipfs/go-ipfs-ds-help v1.0.0 h1:bEQ8hMGs80h0sR8O4tfDgV6B01aaF9qeTrujrTLYV3g= +github.com/ipfs/go-ipfs-ds-help v1.0.0/go.mod h1:ujAbkeIgkKAWtxxNkoZHWLCyk5JpPoKnGyCcsoF6ueE= github.com/ipfs/go-ipfs-exchange-interface v0.0.1 h1:LJXIo9W7CAmugqI+uofioIpRb6rY30GUu7G6LUfpMvM= github.com/ipfs/go-ipfs-exchange-interface v0.0.1/go.mod h1:c8MwfHjtQjPoDyiy9cFquVtVHkO9b9Ob3FG91qJnWCM= github.com/ipfs/go-ipfs-exchange-offline v0.0.1 h1:P56jYKZF7lDDOLx5SotVh5KFxoY6C81I1NSHW1FxGew= diff --git a/retrievalmarket/discovery/local.go b/retrievalmarket/discovery/local.go index 9efa49c5..42384f1f 100644 --- a/retrievalmarket/discovery/local.go +++ b/retrievalmarket/discovery/local.go @@ -19,7 +19,7 @@ func NewLocal(ds datastore.Batching) *Local { } func (l *Local) AddPeer(cid cid.Cid, peer retrievalmarket.RetrievalPeer) error { - key := dshelp.CidToDsKey(cid) + key := dshelp.MultihashToDsKey(cid.Hash()) exists, err := l.ds.Has(key) if err != nil { return err @@ -64,7 +64,7 @@ func hasPeer(peerList []retrievalmarket.RetrievalPeer, peer retrievalmarket.Retr } func (l *Local) GetPeers(payloadCID cid.Cid) ([]retrievalmarket.RetrievalPeer, error) { - entry, err := l.ds.Get(dshelp.CidToDsKey(payloadCID)) + entry, err := l.ds.Get(dshelp.MultihashToDsKey(payloadCID.Hash())) if err == datastore.ErrNotFound { return []retrievalmarket.RetrievalPeer{}, nil } diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index 841c2e77..2ece5c75 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -286,11 +286,13 @@ func (c *Client) AddPaymentEscrow(ctx context.Context, addr address.Address, amo return err } - err = c.node.WaitForMessage(mcid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error { - if code == exitcode.Ok { - done <- nil + err = c.node.WaitForMessage(ctx, mcid, func(code exitcode.ExitCode, bytes []byte, err error) error { + if err != nil { + done <- xerrors.Errorf("AddFunds errored: %w", err) + } else if code != exitcode.Ok { + done <- xerrors.Errorf("AddFunds error, exit code: %s", code.String()) } else { - done <- xerrors.Errorf("AddFunds error, exit code: %w", code) + done <- nil } return nil }) diff --git a/storagemarket/impl/clientstates/client_states.go b/storagemarket/impl/clientstates/client_states.go index 79b73664..69ff79c9 100644 --- a/storagemarket/impl/clientstates/client_states.go +++ b/storagemarket/impl/clientstates/client_states.go @@ -53,11 +53,15 @@ func EnsureClientFunds(ctx fsm.Context, environment ClientDealEnvironment, deal func WaitForFunding(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { node := environment.Node() - return node.WaitForMessage(deal.AddFundsCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error { - if code == exitcode.Ok { - return ctx.Trigger(storagemarket.ClientEventFundsEnsured) + return node.WaitForMessage(ctx.Context(), deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error { + if err != nil { + return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds err: %w", err)) + } + if code != exitcode.Ok { + return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds exit code: %s", code.String())) } - return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds exit code: %w", code)) + return ctx.Trigger(storagemarket.ClientEventFundsEnsured) + }) } diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 61c3ddd8..8b49536d 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -256,11 +256,13 @@ func (p *Provider) AddStorageCollateral(ctx context.Context, amount abi.TokenAmo return err } - err = p.spn.WaitForMessage(mcid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error { - if code == exitcode.Ok { - done <- nil + err = p.spn.WaitForMessage(ctx, mcid, func(code exitcode.ExitCode, bytes []byte, err error) error { + if err != nil { + done <- xerrors.Errorf("AddFunds errored: %w", err) + } else if code != exitcode.Ok { + done <- xerrors.Errorf("AddFunds error, exit code: %s", code.String()) } else { - done <- xerrors.Errorf("AddFunds error, exit code: %w", code) + done <- nil } return nil }) diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index c83412e6..9045f614 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -192,11 +192,14 @@ func EnsureProviderFunds(ctx fsm.Context, environment ProviderDealEnvironment, d func WaitForFunding(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { node := environment.Node() - return node.WaitForMessage(deal.AddFundsCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error { - if code == exitcode.Ok { - return ctx.Trigger(storagemarket.ProviderEventFunded) + return node.WaitForMessage(ctx.Context(), deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error { + if err != nil { + return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds errored: %w", err)) + } + if code != exitcode.Ok { + return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %s", code.String())) } - return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %w", code)) + return ctx.Trigger(storagemarket.ProviderEventFunded) }) } @@ -220,32 +223,35 @@ func PublishDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor // WaitForPublish waits for the publish message on chain and sends the deal id back to the client func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { - return environment.Node().WaitForMessage(deal.PublishCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, retBytes []byte) error { - if code == exitcode.Ok { - var retval market.PublishStorageDealsReturn - err := retval.UnmarshalCBOR(bytes.NewReader(retBytes)) - if err != nil { - return err - } - - err = environment.SendSignedResponse(ctx.Context(), &network.Response{ - State: storagemarket.StorageDealProposalAccepted, - Proposal: deal.ProposalCid, - PublishMessage: &deal.PublishCid, - }) - - if err != nil { - return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) - } - - if err := environment.Disconnect(deal.ProposalCid); err != nil { - log.Warnf("closing client connection: %+v", err) - } - - return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0]) + return environment.Node().WaitForMessage(ctx.Context(), deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error { + if err != nil { + return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals errored: %w", err)) + } + if code != exitcode.Ok { + return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %s", code.String())) + } + var retval market.PublishStorageDealsReturn + err = retval.UnmarshalCBOR(bytes.NewReader(retBytes)) + if err != nil { + return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err)) + } + + err = environment.SendSignedResponse(ctx.Context(), &network.Response{ + State: storagemarket.StorageDealProposalAccepted, + Proposal: deal.ProposalCid, + PublishMessage: &deal.PublishCid, + }) + + if err != nil { + return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) } - return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %w", code)) + if err := environment.Disconnect(deal.ProposalCid); err != nil { + log.Warnf("closing client connection: %+v", err) + } + + return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0]) + }) } diff --git a/storagemarket/testnodes/testnodes.go b/storagemarket/testnodes/testnodes.go index bcab0ca9..16aadb5d 100644 --- a/storagemarket/testnodes/testnodes.go +++ b/storagemarket/testnodes/testnodes.go @@ -93,10 +93,11 @@ type FakeCommonNode struct { GetBalanceError error GetChainHeadError error - WaitForMessageBlocks bool - WaitForMessageError error - WaitForMessageExitCode exitcode.ExitCode - WaitForMessageRetBytes []byte + WaitForMessageBlocks bool + WaitForMessageError error + WaitForMessageExitCode exitcode.ExitCode + WaitForMessageRetBytes []byte + WaitForMessageNodeError error } // GetChainHead returns the state id in the storage market state @@ -127,7 +128,7 @@ func (n *FakeCommonNode) EnsureFunds(ctx context.Context, addr, wallet address.A return cid.Undef, n.EnsureFundsError } -func (n *FakeCommonNode) WaitForMessage(mcid cid.Cid, confidence int64, onCompletion func(exitcode.ExitCode, []byte) error) error { +func (n *FakeCommonNode) WaitForMessage(ctx context.Context, mcid cid.Cid, onCompletion func(exitcode.ExitCode, []byte, error) error) error { if n.WaitForMessageError != nil { return n.WaitForMessageError } @@ -137,7 +138,7 @@ func (n *FakeCommonNode) WaitForMessage(mcid cid.Cid, confidence int64, onComple return nil } - return onCompletion(n.WaitForMessageExitCode, n.WaitForMessageRetBytes) + return onCompletion(n.WaitForMessageExitCode, n.WaitForMessageRetBytes, n.WaitForMessageNodeError) } // GetBalance returns the funds in the storage market state diff --git a/storagemarket/types.go b/storagemarket/types.go index c8c64f84..cc0d52e7 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -21,7 +21,6 @@ import ( const DealProtocolID = "/fil/storage/mk/1.0.1" const AskProtocolID = "/fil/storage/ask/1.0.1" -const ChainConfidence = 10 type Balance struct { Locked abi.TokenAmount @@ -361,7 +360,7 @@ type StorageFunds interface { // Verify a signature against an address + data VerifySignature(ctx context.Context, signature crypto.Signature, signer address.Address, plaintext []byte, tok shared.TipSetToken) (bool, error) - WaitForMessage(mcid cid.Cid, confidence int64, onCompletion func(exitcode.ExitCode, []byte) error) error + WaitForMessage(ctx context.Context, mcid cid.Cid, onCompletion func(exitcode.ExitCode, []byte, error) error) error } // Node dependencies for a StorageProvider