Skip to content

Commit

Permalink
Storage Market Changes Based On Lotus Integration (#223)
Browse files Browse the repository at this point in the history
* chore(deps): update go-ipfs-ds-help

update go-ipfs-ds-help and go-ipfs-blockstore to match lotus

* feat(storagemarket): cleanup WaitForMessage

Modify WaitForMessage signature -- as it turns out, we cannot wait for messages with confidences,
only state changes (for the most part)

* feat(storagemarket): add context to WaitForMessage

add a context to WaitForMessage since generally wait apis on node expect a context
  • Loading branch information
hannahhoward authored May 5, 2020
1 parent 1b12a92 commit 3e7b8cb
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 52 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ require (
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-graphsync v0.0.6-0.20200504202014-9d5f2c26a103
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-blockstore v1.0.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v0.1.1
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipld-cbor v0.0.4
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2Is
github.com/ipfs/go-ipfs-blockstore v0.1.1/go.mod h1:8gZOgIN5e+Xdg2YSGdwTTRbguSVjYyosIDRQCY8E9QM=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
github.com/ipfs/go-ipfs-blockstore v1.0.0 h1:pmFp5sFYsYVvMOp9X01AK3s85usVcLvkBTRsN6SnfUA=
github.com/ipfs/go-ipfs-blockstore v1.0.0/go.mod h1:knLVdhVU9L7CC4T+T4nvGdeUIPAXlnd9zmXfp+9MIjU=
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk=
github.com/ipfs/go-ipfs-chunker v0.0.1 h1:cHUUxKFQ99pozdahi+uSC/3Y6HeRpi9oTeUHbE27SEw=
Expand All @@ -300,6 +302,8 @@ github.com/ipfs/go-ipfs-ds-help v0.0.1 h1:QBg+Ts2zgeemK/dB0saiF/ykzRGgfoFMT90Rzo
github.com/ipfs/go-ipfs-ds-help v0.0.1/go.mod h1:gtP9xRaZXqIQRh1HRpp595KbBEdgqWFxefeVKOV8sxo=
github.com/ipfs/go-ipfs-ds-help v0.1.1 h1:IW/bXGeaAZV2VH0Kuok+Ohva/zHkHmeLFBxC1k7mNPc=
github.com/ipfs/go-ipfs-ds-help v0.1.1/go.mod h1:SbBafGJuGsPI/QL3j9Fc5YPLeAu+SzOkI0gFwAg+mOs=
github.com/ipfs/go-ipfs-ds-help v1.0.0 h1:bEQ8hMGs80h0sR8O4tfDgV6B01aaF9qeTrujrTLYV3g=
github.com/ipfs/go-ipfs-ds-help v1.0.0/go.mod h1:ujAbkeIgkKAWtxxNkoZHWLCyk5JpPoKnGyCcsoF6ueE=
github.com/ipfs/go-ipfs-exchange-interface v0.0.1 h1:LJXIo9W7CAmugqI+uofioIpRb6rY30GUu7G6LUfpMvM=
github.com/ipfs/go-ipfs-exchange-interface v0.0.1/go.mod h1:c8MwfHjtQjPoDyiy9cFquVtVHkO9b9Ob3FG91qJnWCM=
github.com/ipfs/go-ipfs-exchange-offline v0.0.1 h1:P56jYKZF7lDDOLx5SotVh5KFxoY6C81I1NSHW1FxGew=
Expand Down
4 changes: 2 additions & 2 deletions retrievalmarket/discovery/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewLocal(ds datastore.Batching) *Local {
}

func (l *Local) AddPeer(cid cid.Cid, peer retrievalmarket.RetrievalPeer) error {
key := dshelp.CidToDsKey(cid)
key := dshelp.MultihashToDsKey(cid.Hash())
exists, err := l.ds.Has(key)
if err != nil {
return err
Expand Down Expand Up @@ -64,7 +64,7 @@ func hasPeer(peerList []retrievalmarket.RetrievalPeer, peer retrievalmarket.Retr
}

func (l *Local) GetPeers(payloadCID cid.Cid) ([]retrievalmarket.RetrievalPeer, error) {
entry, err := l.ds.Get(dshelp.CidToDsKey(payloadCID))
entry, err := l.ds.Get(dshelp.MultihashToDsKey(payloadCID.Hash()))
if err == datastore.ErrNotFound {
return []retrievalmarket.RetrievalPeer{}, nil
}
Expand Down
10 changes: 6 additions & 4 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,13 @@ func (c *Client) AddPaymentEscrow(ctx context.Context, addr address.Address, amo
return err
}

err = c.node.WaitForMessage(mcid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error {
if code == exitcode.Ok {
done <- nil
err = c.node.WaitForMessage(ctx, mcid, func(code exitcode.ExitCode, bytes []byte, err error) error {
if err != nil {
done <- xerrors.Errorf("AddFunds errored: %w", err)
} else if code != exitcode.Ok {
done <- xerrors.Errorf("AddFunds error, exit code: %s", code.String())
} else {
done <- xerrors.Errorf("AddFunds error, exit code: %w", code)
done <- nil
}
return nil
})
Expand Down
12 changes: 8 additions & 4 deletions storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ func EnsureClientFunds(ctx fsm.Context, environment ClientDealEnvironment, deal
func WaitForFunding(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {
node := environment.Node()

return node.WaitForMessage(deal.AddFundsCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error {
if code == exitcode.Ok {
return ctx.Trigger(storagemarket.ClientEventFundsEnsured)
return node.WaitForMessage(ctx.Context(), deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error {
if err != nil {
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds err: %w", err))
}
if code != exitcode.Ok {
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds exit code: %s", code.String()))
}
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds exit code: %w", code))
return ctx.Trigger(storagemarket.ClientEventFundsEnsured)

})
}

Expand Down
10 changes: 6 additions & 4 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,13 @@ func (p *Provider) AddStorageCollateral(ctx context.Context, amount abi.TokenAmo
return err
}

err = p.spn.WaitForMessage(mcid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error {
if code == exitcode.Ok {
done <- nil
err = p.spn.WaitForMessage(ctx, mcid, func(code exitcode.ExitCode, bytes []byte, err error) error {
if err != nil {
done <- xerrors.Errorf("AddFunds errored: %w", err)
} else if code != exitcode.Ok {
done <- xerrors.Errorf("AddFunds error, exit code: %s", code.String())
} else {
done <- xerrors.Errorf("AddFunds error, exit code: %w", code)
done <- nil
}
return nil
})
Expand Down
62 changes: 34 additions & 28 deletions storagemarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,14 @@ func EnsureProviderFunds(ctx fsm.Context, environment ProviderDealEnvironment, d
func WaitForFunding(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
node := environment.Node()

return node.WaitForMessage(deal.AddFundsCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error {
if code == exitcode.Ok {
return ctx.Trigger(storagemarket.ProviderEventFunded)
return node.WaitForMessage(ctx.Context(), deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error {
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds errored: %w", err))
}
if code != exitcode.Ok {
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %s", code.String()))
}
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %w", code))
return ctx.Trigger(storagemarket.ProviderEventFunded)
})
}

Expand All @@ -220,32 +223,35 @@ func PublishDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor

// WaitForPublish waits for the publish message on chain and sends the deal id back to the client
func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
return environment.Node().WaitForMessage(deal.PublishCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, retBytes []byte) error {
if code == exitcode.Ok {
var retval market.PublishStorageDealsReturn
err := retval.UnmarshalCBOR(bytes.NewReader(retBytes))
if err != nil {
return err
}

err = environment.SendSignedResponse(ctx.Context(), &network.Response{
State: storagemarket.StorageDealProposalAccepted,
Proposal: deal.ProposalCid,
PublishMessage: &deal.PublishCid,
})

if err != nil {
return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err)
}

if err := environment.Disconnect(deal.ProposalCid); err != nil {
log.Warnf("closing client connection: %+v", err)
}

return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0])
return environment.Node().WaitForMessage(ctx.Context(), deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error {
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals errored: %w", err))
}
if code != exitcode.Ok {
return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %s", code.String()))
}
var retval market.PublishStorageDealsReturn
err = retval.UnmarshalCBOR(bytes.NewReader(retBytes))
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err))
}

err = environment.SendSignedResponse(ctx.Context(), &network.Response{
State: storagemarket.StorageDealProposalAccepted,
Proposal: deal.ProposalCid,
PublishMessage: &deal.PublishCid,
})

if err != nil {
return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err)
}

return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %w", code))
if err := environment.Disconnect(deal.ProposalCid); err != nil {
log.Warnf("closing client connection: %+v", err)
}

return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0])

})
}

Expand Down
13 changes: 7 additions & 6 deletions storagemarket/testnodes/testnodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ type FakeCommonNode struct {
GetBalanceError error
GetChainHeadError error

WaitForMessageBlocks bool
WaitForMessageError error
WaitForMessageExitCode exitcode.ExitCode
WaitForMessageRetBytes []byte
WaitForMessageBlocks bool
WaitForMessageError error
WaitForMessageExitCode exitcode.ExitCode
WaitForMessageRetBytes []byte
WaitForMessageNodeError error
}

// GetChainHead returns the state id in the storage market state
Expand Down Expand Up @@ -127,7 +128,7 @@ func (n *FakeCommonNode) EnsureFunds(ctx context.Context, addr, wallet address.A
return cid.Undef, n.EnsureFundsError
}

func (n *FakeCommonNode) WaitForMessage(mcid cid.Cid, confidence int64, onCompletion func(exitcode.ExitCode, []byte) error) error {
func (n *FakeCommonNode) WaitForMessage(ctx context.Context, mcid cid.Cid, onCompletion func(exitcode.ExitCode, []byte, error) error) error {
if n.WaitForMessageError != nil {
return n.WaitForMessageError
}
Expand All @@ -137,7 +138,7 @@ func (n *FakeCommonNode) WaitForMessage(mcid cid.Cid, confidence int64, onComple
return nil
}

return onCompletion(n.WaitForMessageExitCode, n.WaitForMessageRetBytes)
return onCompletion(n.WaitForMessageExitCode, n.WaitForMessageRetBytes, n.WaitForMessageNodeError)
}

// GetBalance returns the funds in the storage market state
Expand Down
3 changes: 1 addition & 2 deletions storagemarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

const DealProtocolID = "/fil/storage/mk/1.0.1"
const AskProtocolID = "/fil/storage/ask/1.0.1"
const ChainConfidence = 10

type Balance struct {
Locked abi.TokenAmount
Expand Down Expand Up @@ -361,7 +360,7 @@ type StorageFunds interface {
// Verify a signature against an address + data
VerifySignature(ctx context.Context, signature crypto.Signature, signer address.Address, plaintext []byte, tok shared.TipSetToken) (bool, error)

WaitForMessage(mcid cid.Cid, confidence int64, onCompletion func(exitcode.ExitCode, []byte) error) error
WaitForMessage(ctx context.Context, mcid cid.Cid, onCompletion func(exitcode.ExitCode, []byte, error) error) error
}

// Node dependencies for a StorageProvider
Expand Down

0 comments on commit 3e7b8cb

Please sign in to comment.