Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout provider after waiting a period of time for transfer to restart #655

Merged
merged 3 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/storageprovider.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ stateDiagram-v2
22 : On entry runs WaitForFunding
24 : On entry runs PublishDeal
25 : On entry runs WaitForPublish
27 : On entry runs WaitForTransferRestart
29 : On entry runs VerifyDealPreCommitted
[*] --> 0
note right of 0
The following events are not shown cause they can trigger from any state.

ProviderEventNodeErrored - transitions state to StorageDealFailing
ProviderEventRestart - does not transition state
ProviderEventAwaitTransferRestartTimeout - just records
end note
0 --> 14 : ProviderEventOpen
14 --> 10 : ProviderEventDealRejected
Expand Down Expand Up @@ -91,6 +93,7 @@ stateDiagram-v2
14 --> 26 : ProviderEventRestart
15 --> 26 : ProviderEventRestart
17 --> 27 : ProviderEventRestart
27 --> 11 : ProviderEventAwaitTransferRestartTimeout
20 --> 11 : ProviderEventTrackFundsFailed

note left of 4 : The following events only record in this state.<br><br>ProviderEventPieceStoreErrored
Expand Down
Binary file modified docs/storageprovider.mmd.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions docs/storageprovider.mmd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
89 changes: 47 additions & 42 deletions storagemarket/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,52 +288,57 @@ const (

// ProviderEventDataTransferCancelled happens when a data transfer is cancelled
ProviderEventDataTransferCancelled

// ProviderEventAwaitTransferRestartTimeout is dispatched after a certain amount of time a provider has been
// waiting for a data transfer to restart. If transfer hasn't restarted, the provider will fail the deal
ProviderEventAwaitTransferRestartTimeout
)

// ProviderEvents maps provider event codes to string names
var ProviderEvents = map[ProviderEvent]string{
ProviderEventOpen: "ProviderEventOpen",
ProviderEventNodeErrored: "ProviderEventNodeErrored",
ProviderEventDealRejected: "ProviderEventDealRejected",
ProviderEventRejectionSent: "ProviderEventRejectionSent",
ProviderEventDealAccepted: "ProviderEventDealAccepted",
ProviderEventDealDeciding: "ProviderEventDealDeciding",
ProviderEventInsufficientFunds: "ProviderEventInsufficientFunds",
ProviderEventFundsReserved: "ProviderEventFundsReserved",
ProviderEventFundsReleased: "ProviderEventFundsReleased",
ProviderEventFundingInitiated: "ProviderEventFundingInitiated",
ProviderEventFunded: "ProviderEventFunded",
ProviderEventDataTransferFailed: "ProviderEventDataTransferFailed",
ProviderEventDataRequested: "ProviderEventDataRequested",
ProviderEventDataTransferInitiated: "ProviderEventDataTransferInitiated",
ProviderEventDataTransferCompleted: "ProviderEventDataTransferCompleted",
ProviderEventManualDataReceived: "ProviderEventManualDataReceived",
ProviderEventDataVerificationFailed: "ProviderEventDataVerificationFailed",
ProviderEventVerifiedData: "ProviderEventVerifiedData",
ProviderEventSendResponseFailed: "ProviderEventSendResponseFailed",
ProviderEventDealPublishInitiated: "ProviderEventDealPublishInitiated",
ProviderEventDealPublished: "ProviderEventDealPublished",
ProviderEventDealPublishError: "ProviderEventDealPublishError",
ProviderEventFileStoreErrored: "ProviderEventFileStoreErrored",
ProviderEventDealHandoffFailed: "ProviderEventDealHandoffFailed",
ProviderEventDealHandedOff: "ProviderEventDealHandedOff",
ProviderEventDealActivationFailed: "ProviderEventDealActivationFailed",
ProviderEventDealActivated: "ProviderEventDealActivated",
ProviderEventPieceStoreErrored: "ProviderEventPieceStoreErrored",
ProviderEventFinalized: "ProviderEventCleanupFinished",
ProviderEventDealCompletionFailed: "ProviderEventDealCompletionFailed",
ProviderEventMultistoreErrored: "ProviderEventMultistoreErrored",
ProviderEventDealExpired: "ProviderEventDealExpired",
ProviderEventDealSlashed: "ProviderEventDealSlashed",
ProviderEventFailed: "ProviderEventFailed",
ProviderEventTrackFundsFailed: "ProviderEventTrackFundsFailed",
ProviderEventRestart: "ProviderEventRestart",
ProviderEventDataTransferRestarted: "ProviderEventDataTransferRestarted",
ProviderEventDataTransferRestartFailed: "ProviderEventDataTransferRestartFailed",
ProviderEventDataTransferStalled: "ProviderEventDataTransferStalled",
ProviderEventDataTransferCancelled: "ProviderEventDataTransferCancelled",
ProviderEventDealPrecommitFailed: "ProviderEventDealPrecommitFailed",
ProviderEventDealPrecommitted: "ProviderEventDealPrecommitted",
ProviderEventOpen: "ProviderEventOpen",
ProviderEventNodeErrored: "ProviderEventNodeErrored",
ProviderEventDealRejected: "ProviderEventDealRejected",
ProviderEventRejectionSent: "ProviderEventRejectionSent",
ProviderEventDealAccepted: "ProviderEventDealAccepted",
ProviderEventDealDeciding: "ProviderEventDealDeciding",
ProviderEventInsufficientFunds: "ProviderEventInsufficientFunds",
ProviderEventFundsReserved: "ProviderEventFundsReserved",
ProviderEventFundsReleased: "ProviderEventFundsReleased",
ProviderEventFundingInitiated: "ProviderEventFundingInitiated",
ProviderEventFunded: "ProviderEventFunded",
ProviderEventDataTransferFailed: "ProviderEventDataTransferFailed",
ProviderEventDataRequested: "ProviderEventDataRequested",
ProviderEventDataTransferInitiated: "ProviderEventDataTransferInitiated",
ProviderEventDataTransferCompleted: "ProviderEventDataTransferCompleted",
ProviderEventManualDataReceived: "ProviderEventManualDataReceived",
ProviderEventDataVerificationFailed: "ProviderEventDataVerificationFailed",
ProviderEventVerifiedData: "ProviderEventVerifiedData",
ProviderEventSendResponseFailed: "ProviderEventSendResponseFailed",
ProviderEventDealPublishInitiated: "ProviderEventDealPublishInitiated",
ProviderEventDealPublished: "ProviderEventDealPublished",
ProviderEventDealPublishError: "ProviderEventDealPublishError",
ProviderEventFileStoreErrored: "ProviderEventFileStoreErrored",
ProviderEventDealHandoffFailed: "ProviderEventDealHandoffFailed",
ProviderEventDealHandedOff: "ProviderEventDealHandedOff",
ProviderEventDealActivationFailed: "ProviderEventDealActivationFailed",
ProviderEventDealActivated: "ProviderEventDealActivated",
ProviderEventPieceStoreErrored: "ProviderEventPieceStoreErrored",
ProviderEventFinalized: "ProviderEventCleanupFinished",
ProviderEventDealCompletionFailed: "ProviderEventDealCompletionFailed",
ProviderEventMultistoreErrored: "ProviderEventMultistoreErrored",
ProviderEventDealExpired: "ProviderEventDealExpired",
ProviderEventDealSlashed: "ProviderEventDealSlashed",
ProviderEventFailed: "ProviderEventFailed",
ProviderEventTrackFundsFailed: "ProviderEventTrackFundsFailed",
ProviderEventRestart: "ProviderEventRestart",
ProviderEventDataTransferRestarted: "ProviderEventDataTransferRestarted",
ProviderEventDataTransferRestartFailed: "ProviderEventDataTransferRestartFailed",
ProviderEventDataTransferStalled: "ProviderEventDataTransferStalled",
ProviderEventDataTransferCancelled: "ProviderEventDataTransferCancelled",
ProviderEventDealPrecommitFailed: "ProviderEventDealPrecommitFailed",
ProviderEventDealPrecommitted: "ProviderEventDealPrecommitted",
ProviderEventAwaitTransferRestartTimeout: "ProviderEventAwaitTransferRestartTimeout",
}

func (e ProviderEvent) String() string {
Expand Down
58 changes: 36 additions & 22 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"time"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -42,6 +43,8 @@ import (
var _ storagemarket.StorageProvider = &Provider{}
var _ network.StorageReceiver = &Provider{}

const defaultAwaitRestartTimeout = 1 * time.Hour

// StoredAsk is an interface which provides access to a StorageAsk
type StoredAsk interface {
GetAsk() *storagemarket.SignedStorageAsk
Expand All @@ -52,16 +55,17 @@ type StoredAsk interface {
type Provider struct {
net network.StorageMarketNetwork

spn storagemarket.StorageProviderNode
fs filestore.FileStore
pieceStore piecestore.PieceStore
conns *connmanager.ConnManager
storedAsk StoredAsk
actor address.Address
dataTransfer datatransfer.Manager
customDealDeciderFunc DealDeciderFunc
pubSub *pubsub.PubSub
readyMgr *shared.ReadyManager
spn storagemarket.StorageProviderNode
fs filestore.FileStore
pieceStore piecestore.PieceStore
conns *connmanager.ConnManager
storedAsk StoredAsk
actor address.Address
dataTransfer datatransfer.Manager
customDealDeciderFunc DealDeciderFunc
awaitTransferRestartTimeout time.Duration
pubSub *pubsub.PubSub
readyMgr *shared.ReadyManager

deals fsm.Group
migrateDeals func(context.Context) error
Expand Down Expand Up @@ -91,6 +95,15 @@ func CustomDealDecisionLogic(decider DealDeciderFunc) StorageProviderOption {
}
}

// AwaitTransferRestartTimeout sets the maximum amount of time a provider will
// wait for a client to restart a data transfer when the node starts up before
// failing the deal
func AwaitTransferRestartTimeout(waitTime time.Duration) StorageProviderOption {
return func(p *Provider) {
p.awaitTransferRestartTimeout = waitTime
}
}

// NewProvider returns a new storage provider
func NewProvider(net network.StorageMarketNetwork,
ds datastore.Batching,
Expand All @@ -104,18 +117,19 @@ func NewProvider(net network.StorageMarketNetwork,
options ...StorageProviderOption,
) (storagemarket.StorageProvider, error) {
h := &Provider{
net: net,
spn: spn,
fs: fs,
pieceStore: pieceStore,
conns: connmanager.NewConnManager(),
storedAsk: storedAsk,
actor: minerAddress,
dataTransfer: dataTransfer,
pubSub: pubsub.New(providerDispatcher),
readyMgr: shared.NewReadyManager(),
dagStore: dagStore,
stores: stores.NewReadWriteBlockstores(),
net: net,
spn: spn,
fs: fs,
pieceStore: pieceStore,
conns: connmanager.NewConnManager(),
storedAsk: storedAsk,
actor: minerAddress,
dataTransfer: dataTransfer,
pubSub: pubsub.New(providerDispatcher),
readyMgr: shared.NewReadyManager(),
dagStore: dagStore,
stores: stores.NewReadWriteBlockstores(),
awaitTransferRestartTimeout: defaultAwaitRestartTimeout,
}
storageMigrations, err := migrations.ProviderMigrations.Build()
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions storagemarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"os"
"time"

"github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -190,6 +191,11 @@ func (p *providerDealEnvironment) UntagPeer(id peer.ID, s string) {
p.p.net.UntagPeer(id, s)
}

func (p *providerDealEnvironment) AwaitRestartTimeout() <-chan time.Time {
timer := time.NewTimer(p.p.awaitTransferRestartTimeout)
return timer.C
}

var _ providerstates.ProviderDealEnvironment = &providerDealEnvironment{}

type providerStoreGetter struct {
Expand Down
40 changes: 26 additions & 14 deletions storagemarket/impl/providerstates/provider_fsm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package providerstates

import (
"fmt"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -212,6 +214,15 @@ var ProviderEvents = fsm.Events{
To(storagemarket.StorageDealProviderTransferAwaitRestart).
FromAny().ToNoChange(),

fsm.Event(storagemarket.ProviderEventAwaitTransferRestartTimeout).
From(storagemarket.StorageDealProviderTransferAwaitRestart).To(storagemarket.StorageDealFailing).
FromAny().ToJustRecord().
Action(func(deal *storagemarket.MinerDeal) error {
if deal.State == storagemarket.StorageDealProviderTransferAwaitRestart {
deal.Message = fmt.Sprintf("timed out waiting for client to restart transfer")
}
return nil
}),
fsm.Event(storagemarket.ProviderEventTrackFundsFailed).
From(storagemarket.StorageDealReserveProviderFunds).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.MinerDeal, err error) error {
Expand All @@ -238,20 +249,21 @@ var ProviderEvents = fsm.Events{

// ProviderStateEntryFuncs are the handlers for different states in a storage client
var ProviderStateEntryFuncs = fsm.StateEntryFuncs{
storagemarket.StorageDealValidating: ValidateDealProposal,
storagemarket.StorageDealAcceptWait: DecideOnProposal,
storagemarket.StorageDealVerifyData: VerifyData,
storagemarket.StorageDealReserveProviderFunds: ReserveProviderFunds,
storagemarket.StorageDealProviderFunding: WaitForFunding,
storagemarket.StorageDealPublish: PublishDeal,
storagemarket.StorageDealPublishing: WaitForPublish,
storagemarket.StorageDealStaged: HandoffDeal,
storagemarket.StorageDealAwaitingPreCommit: VerifyDealPreCommitted,
storagemarket.StorageDealSealing: VerifyDealActivated,
storagemarket.StorageDealRejecting: RejectDeal,
storagemarket.StorageDealFinalizing: CleanupDeal,
storagemarket.StorageDealActive: WaitForDealCompletion,
storagemarket.StorageDealFailing: FailDeal,
storagemarket.StorageDealValidating: ValidateDealProposal,
storagemarket.StorageDealAcceptWait: DecideOnProposal,
storagemarket.StorageDealProviderTransferAwaitRestart: WaitForTransferRestart,
storagemarket.StorageDealVerifyData: VerifyData,
storagemarket.StorageDealReserveProviderFunds: ReserveProviderFunds,
storagemarket.StorageDealProviderFunding: WaitForFunding,
storagemarket.StorageDealPublish: PublishDeal,
storagemarket.StorageDealPublishing: WaitForPublish,
storagemarket.StorageDealStaged: HandoffDeal,
storagemarket.StorageDealAwaitingPreCommit: VerifyDealPreCommitted,
storagemarket.StorageDealSealing: VerifyDealActivated,
storagemarket.StorageDealRejecting: RejectDeal,
storagemarket.StorageDealFinalizing: CleanupDeal,
storagemarket.StorageDealActive: WaitForDealCompletion,
storagemarket.StorageDealFailing: FailDeal,
}

// ProviderFinalityStates are the states that terminate deal processing for a deal.
Expand Down
17 changes: 17 additions & 0 deletions storagemarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"strings"
"time"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -53,6 +54,7 @@ type ProviderDealEnvironment interface {
FileStore() filestore.FileStore
PieceStore() piecestore.PieceStore
RunCustomDecisionLogic(context.Context, storagemarket.MinerDeal) (bool, string, error)
AwaitRestartTimeout() <-chan time.Time
network.PeerTagger
}

Expand Down Expand Up @@ -208,6 +210,21 @@ func DecideOnProposal(ctx fsm.Context, environment ProviderDealEnvironment, deal
return ctx.Trigger(storagemarket.ProviderEventDataRequested)
}

// WaitForTransferRestart fires a timeout after a set amount of time. If the restart hasn't started at this point,
// the transfer fails
func WaitForTransferRestart(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {

timeout := environment.AwaitRestartTimeout()
go func() {
select {
case <-ctx.Context().Done():
case <-timeout:
ctx.Trigger(storagemarket.ProviderEventAwaitTransferRestartTimeout)
}
}()
return nil
}

// VerifyData verifies that data received for a deal matches the pieceCID
// in the proposal
func VerifyData(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
Expand Down
Loading