Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better storage fsm error handling #484

Merged
merged 3 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions storagemarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package providerstates
import (
"bytes"
"context"
"errors"
"fmt"
"io"

Expand Down Expand Up @@ -297,40 +296,71 @@ func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal s

// HandoffDeal hands off a published deal for sealing and commitment in a sector
func HandoffDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
triggerHandoffFailed := func(err error, packingErr error) error {
if packingErr == nil {
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
}
packingErr = xerrors.Errorf("packing error: %w", packingErr)
err = xerrors.Errorf("%s: %w", err, packingErr)
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
}

var packingInfo *storagemarket.PackingResult
var packingErr error
if deal.PiecePath != filestore.Path("") {
if deal.PiecePath != "" {
// In earlier versions of lotus the piece data was written to disk, so
// if PiecePath is set, create a Reader from the file path.
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
file, err := environment.FileStore().Open(deal.PiecePath)
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err))
return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored,
xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err))
}

// Hand the deal off to the process that adds it to a sector
packingInfo, err = handoffDeal(ctx.Context(), environment, deal, file, uint64(file.Size()))
if err != nil {
err = xerrors.Errorf("packing piece at path %s: %w", deal.PiecePath, err)
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
}
packingInfo, packingErr = handoffDeal(ctx.Context(), environment, deal, file, uint64(file.Size()))
} else {
// Create a reader to read the piece from the blockstore
pieceReader, pieceSize, err, writeErrChan := environment.GeneratePieceReader(deal.StoreID, deal.Ref.Root, shared.AllSelector())
if err != nil {
err := xerrors.Errorf("reading piece %s from store %d: %w", deal.Ref.Root, deal.StoreID, err)
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
}

// Hand the deal off to the process that adds it to a sector
var packingErr error
packingInfo, packingErr = handoffDeal(ctx.Context(), environment, deal, pieceReader, pieceSize)

// Close the read side of the pipe
err = pieceReader.Close()
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
err = xerrors.Errorf("closing reader for piece %s from store %d: %w", deal.Ref.Root, deal.StoreID, err)
return triggerHandoffFailed(err, packingErr)
}

// Wait for the write to complete
select {
case <-ctx.Context().Done():
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, errors.New("write never finished"))
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed,
xerrors.Errorf("writing piece %s never finished: %w", deal.Ref.Root, ctx.Context().Err()))
case err = <-writeErrChan:
if err != nil {
err = xerrors.Errorf("writing piece %s: %w", deal.Ref.Root, err)
return triggerHandoffFailed(err, packingErr)
}
}
if err != nil {

if packingErr != nil {
err = xerrors.Errorf("packing piece %s: %w", deal.Ref.Root, packingErr)
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
}
}

if packingErr != nil {
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, packingErr)
}

if err := recordPiece(environment, deal, packingInfo.SectorNumber, packingInfo.Offset, packingInfo.Size); err != nil {
log.Errorf("failed to register deal data for retrieval: %s", err)
err = xerrors.Errorf("failed to register deal data for piece %s for retrieval: %w", deal.Ref.Root, err)
log.Error(err.Error())
_ = ctx.Trigger(storagemarket.ProviderEventPieceStoreErrored, err)
}

Expand Down
44 changes: 37 additions & 7 deletions storagemarket/impl/providerstates/provider_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State)
require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to load block locations: file not found"), deal.Message)
require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to load block locations: file not found", deal.Ref.Root), deal.Message)
},
},
"add piece block locations errors": {
Expand All @@ -667,7 +667,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State)
require.Equal(t, "recording piece for retrieval: failed to add piece block locations: could not add block locations", deal.Message)
require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to add piece block locations: could not add block locations", deal.Ref.Root), deal.Message)
},
},
"add deal for piece errors": {
Expand All @@ -684,7 +684,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State)
require.Equal(t, "recording piece for retrieval: failed to add deal for piece: could not add deal info", deal.Message)
require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to add deal for piece: could not add deal info", deal.Ref.Root), deal.Message)
},
},
"opening file errors": {
Expand All @@ -709,7 +709,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "handing off deal to node: failed building sector", deal.Message)
require.Equal(t, "handing off deal to node: packing piece at path file.txt: failed building sector", deal.Message)
},
},
"assemble piece on demand fails immediately": {
Expand All @@ -721,7 +721,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "handing off deal to node: something went wrong", deal.Message)
require.Equal(t, fmt.Sprintf("handing off deal to node: reading piece %s from store %d: something went wrong", deal.Ref.Root, deal.StoreID), deal.Message)
},
},
"assemble piece on demand fails async": {
Expand All @@ -733,7 +733,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "handing off deal to node: something went wrong", deal.Message)
require.Equal(t, fmt.Sprintf("handing off deal to node: writing piece %s: something went wrong", deal.Ref.Root), deal.Message)
},
},
"assemble piece on demand fails closing reader": {
Expand All @@ -745,7 +745,37 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "handing off deal to node: something went wrong", deal.Message)
require.Equal(t, fmt.Sprintf("handing off deal to node: closing reader for piece %s from store %d: something went wrong", deal.Ref.Root, deal.StoreID), deal.Message)
},
},
"assemble piece on demand fails closing reader and OnComplete fails": {
environmentParams: environmentParams{
PieceReader: newStubbedReadCloser(errors.New("close reader failed")),
},
dealParams: dealParams{
FastRetrieval: true,
},
nodeParams: nodeParams{
OnDealCompleteError: errors.New("failed building sector"),
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, fmt.Sprintf("handing off deal to node: closing reader for piece %s from store %d: close reader failed: packing error: failed building sector", deal.Ref.Root, deal.StoreID), deal.Message)
},
},
"assemble piece on demand fails async and OnComplete fails": {
environmentParams: environmentParams{
GeneratePieceReaderErrAsync: errors.New("async err"),
},
dealParams: dealParams{
FastRetrieval: true,
},
nodeParams: nodeParams{
OnDealCompleteError: errors.New("failed building sector"),
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, fmt.Sprintf("handing off deal to node: writing piece %s: async err: packing error: failed building sector", deal.Ref.Root), deal.Message)
},
},
}
Expand Down