From 91cb80b1a1aef7710a1f77a3e38d86afc74d7d21 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 4 Feb 2021 11:31:31 +0100 Subject: [PATCH 1/2] fix: better storage fsm error handling --- .../impl/providerstates/provider_states.go | 56 ++++++++++++++----- .../providerstates/provider_states_test.go | 44 ++++++++++++--- 2 files changed, 80 insertions(+), 20 deletions(-) diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index cf8a2ccb..c1172d6d 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -3,7 +3,6 @@ package providerstates import ( "bytes" "context" - "errors" "fmt" "io" @@ -297,40 +296,71 @@ func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal s // HandoffDeal hands off a published deal for sealing and commitment in a sector func HandoffDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { + triggerHandoffFailed := func(err error, packingErr error) error { + if packingErr == nil { + return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) + } + packingErr = xerrors.Errorf("packing error: %w", packingErr) + err = xerrors.Errorf("%s: %w", err, packingErr) + return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) + } + var packingInfo *storagemarket.PackingResult - var packingErr error - if deal.PiecePath != filestore.Path("") { + if deal.PiecePath != "" { + // In earlier versions of lotus the piece data was written to disk, so + // if PiecePath is set, create a Reader from the file path. file, err := environment.FileStore().Open(deal.PiecePath) if err != nil { - return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err)) + return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, + xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err)) + } + + // Hand the deal off to the process that adds it to a sector + packingInfo, err = handoffDeal(ctx.Context(), environment, deal, file, uint64(file.Size())) + if err != nil { + err = xerrors.Errorf("packing piece at path %s: %w", deal.PiecePath, err) + return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) } - packingInfo, packingErr = handoffDeal(ctx.Context(), environment, deal, file, uint64(file.Size())) } else { + // Create a reader to read the piece from the blockstore pieceReader, pieceSize, err, writeErrChan := environment.GeneratePieceReader(deal.StoreID, deal.Ref.Root, shared.AllSelector()) if err != nil { + err := xerrors.Errorf("reading piece %s from store %d: %w", deal.Ref.Root, deal.StoreID, err) return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) } + + // Hand the deal off to the process that adds it to a sector + var packingErr error packingInfo, packingErr = handoffDeal(ctx.Context(), environment, deal, pieceReader, pieceSize) + + // Close the read side of the pipe err = pieceReader.Close() if err != nil { - return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) + err = xerrors.Errorf("closing reader for piece %s from store %d: %w", deal.Ref.Root, deal.StoreID, err) + return triggerHandoffFailed(err, packingErr) } + + // Wait for the write to complete select { case <-ctx.Context().Done(): - return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, errors.New("write never finished")) + return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, + xerrors.Errorf("writing piece %s never finished: %w", deal.Ref.Root, ctx.Context().Err())) case err = <-writeErrChan: + if err != nil { + err = xerrors.Errorf("writing piece %s: %w", deal.Ref.Root, err) + return triggerHandoffFailed(err, packingErr) + } } - if err != nil { + + if packingErr != nil { + err = xerrors.Errorf("packing piece %s: %w", deal.Ref.Root, packingErr) return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) } } - if packingErr != nil { - return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, packingErr) - } - if err := recordPiece(environment, deal, packingInfo.SectorNumber, packingInfo.Offset, packingInfo.Size); err != nil { - log.Errorf("failed to register deal data for retrieval: %s", err) + err = xerrors.Errorf("failed to register deal data for piece %s for retrieval: %w", deal.Ref.Root, err) + log.Error(err.Error()) _ = ctx.Trigger(storagemarket.ProviderEventPieceStoreErrored, err) } diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index 19692ce7..a188d739 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -650,7 +650,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State) - require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to load block locations: file not found"), deal.Message) + require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to load block locations: file not found", deal.Ref.Root), deal.Message) }, }, "add piece block locations errors": { @@ -667,7 +667,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State) - require.Equal(t, "recording piece for retrieval: failed to add piece block locations: could not add block locations", deal.Message) + require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to add piece block locations: could not add block locations", deal.Ref.Root), deal.Message) }, }, "add deal for piece errors": { @@ -684,7 +684,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State) - require.Equal(t, "recording piece for retrieval: failed to add deal for piece: could not add deal info", deal.Message) + require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to add deal for piece: could not add deal info", deal.Ref.Root), deal.Message) }, }, "opening file errors": { @@ -709,7 +709,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "handing off deal to node: failed building sector", deal.Message) + require.Equal(t, "handing off deal to node: packing piece at path file.txt: failed building sector", deal.Message) }, }, "assemble piece on demand fails immediately": { @@ -721,7 +721,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "handing off deal to node: something went wrong", deal.Message) + require.Equal(t, fmt.Sprintf("handing off deal to node: reading piece %s from store %d: something went wrong", deal.Ref.Root, deal.StoreID), deal.Message) }, }, "assemble piece on demand fails async": { @@ -733,7 +733,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "handing off deal to node: something went wrong", deal.Message) + require.Equal(t, fmt.Sprintf("handing off deal to node: writing piece %s: something went wrong", deal.Ref.Root), deal.Message) }, }, "assemble piece on demand fails closing reader": { @@ -745,7 +745,37 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "handing off deal to node: something went wrong", deal.Message) + require.Equal(t, fmt.Sprintf("handing off deal to node: closing reader for piece %s from store %d: something went wrong", deal.Ref.Root, deal.StoreID), deal.Message) + }, + }, + "assemble piece on demand fails closing reader and OnComplete fails": { + environmentParams: environmentParams{ + PieceReader: newStubbedReadCloser(errors.New("close reader failed")), + }, + dealParams: dealParams{ + FastRetrieval: true, + }, + nodeParams: nodeParams{ + OnDealCompleteError: errors.New("failed building sector"), + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, fmt.Sprintf("handing off deal to node: closing reader for piece %s from store %d: close reader failed: packing error: failed building sector", deal.Ref.Root, deal.StoreID), deal.Message) + }, + }, + "assemble piece on demand fails async and OnComplete fails": { + environmentParams: environmentParams{ + GeneratePieceReaderErrAsync: errors.New("async err"), + }, + dealParams: dealParams{ + FastRetrieval: true, + }, + nodeParams: nodeParams{ + OnDealCompleteError: errors.New("failed building sector"), + }, + dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { + tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) + require.Equal(t, fmt.Sprintf("handing off deal to node: writing piece %s: async err: packing error: failed building sector", deal.Ref.Root), deal.Message) }, }, } From bbd0fd904fa813e73f47d3a72251ff1e4cadbbc4 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 5 Feb 2021 12:01:30 +0100 Subject: [PATCH 2/2] fix: log piece cid not root hash --- .../impl/providerstates/provider_states.go | 16 ++++++++-------- .../impl/providerstates/provider_states_test.go | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index c1172d6d..6188bcc3 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -307,8 +307,8 @@ func HandoffDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor var packingInfo *storagemarket.PackingResult if deal.PiecePath != "" { - // In earlier versions of lotus the piece data was written to disk, so - // if PiecePath is set, create a Reader from the file path. + // Data for offline deals is stored on disk, so if PiecePath is set, + // create a Reader from the file path file, err := environment.FileStore().Open(deal.PiecePath) if err != nil { return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, @@ -325,7 +325,7 @@ func HandoffDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor // Create a reader to read the piece from the blockstore pieceReader, pieceSize, err, writeErrChan := environment.GeneratePieceReader(deal.StoreID, deal.Ref.Root, shared.AllSelector()) if err != nil { - err := xerrors.Errorf("reading piece %s from store %d: %w", deal.Ref.Root, deal.StoreID, err) + err := xerrors.Errorf("reading piece %s from store %d: %w", deal.Ref.PieceCid, deal.StoreID, err) return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) } @@ -336,7 +336,7 @@ func HandoffDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor // Close the read side of the pipe err = pieceReader.Close() if err != nil { - err = xerrors.Errorf("closing reader for piece %s from store %d: %w", deal.Ref.Root, deal.StoreID, err) + err = xerrors.Errorf("closing reader for piece %s from store %d: %w", deal.Ref.PieceCid, deal.StoreID, err) return triggerHandoffFailed(err, packingErr) } @@ -344,22 +344,22 @@ func HandoffDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor select { case <-ctx.Context().Done(): return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, - xerrors.Errorf("writing piece %s never finished: %w", deal.Ref.Root, ctx.Context().Err())) + xerrors.Errorf("writing piece %s never finished: %w", deal.Ref.PieceCid, ctx.Context().Err())) case err = <-writeErrChan: if err != nil { - err = xerrors.Errorf("writing piece %s: %w", deal.Ref.Root, err) + err = xerrors.Errorf("writing piece %s: %w", deal.Ref.PieceCid, err) return triggerHandoffFailed(err, packingErr) } } if packingErr != nil { - err = xerrors.Errorf("packing piece %s: %w", deal.Ref.Root, packingErr) + err = xerrors.Errorf("packing piece %s: %w", deal.Ref.PieceCid, packingErr) return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) } } if err := recordPiece(environment, deal, packingInfo.SectorNumber, packingInfo.Offset, packingInfo.Size); err != nil { - err = xerrors.Errorf("failed to register deal data for piece %s for retrieval: %w", deal.Ref.Root, err) + err = xerrors.Errorf("failed to register deal data for piece %s for retrieval: %w", deal.Ref.PieceCid, err) log.Error(err.Error()) _ = ctx.Trigger(storagemarket.ProviderEventPieceStoreErrored, err) } diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index a188d739..6090a048 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -650,7 +650,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State) - require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to load block locations: file not found", deal.Ref.Root), deal.Message) + require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to load block locations: file not found", deal.Ref.PieceCid), deal.Message) }, }, "add piece block locations errors": { @@ -667,7 +667,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State) - require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to add piece block locations: could not add block locations", deal.Ref.Root), deal.Message) + require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to add piece block locations: could not add block locations", deal.Ref.PieceCid), deal.Message) }, }, "add deal for piece errors": { @@ -684,7 +684,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State) - require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to add deal for piece: could not add deal info", deal.Ref.Root), deal.Message) + require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to add deal for piece: could not add deal info", deal.Ref.PieceCid), deal.Message) }, }, "opening file errors": { @@ -721,7 +721,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, fmt.Sprintf("handing off deal to node: reading piece %s from store %d: something went wrong", deal.Ref.Root, deal.StoreID), deal.Message) + require.Equal(t, fmt.Sprintf("handing off deal to node: reading piece %s from store %d: something went wrong", deal.Ref.PieceCid, deal.StoreID), deal.Message) }, }, "assemble piece on demand fails async": { @@ -733,7 +733,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, fmt.Sprintf("handing off deal to node: writing piece %s: something went wrong", deal.Ref.Root), deal.Message) + require.Equal(t, fmt.Sprintf("handing off deal to node: writing piece %s: something went wrong", deal.Ref.PieceCid), deal.Message) }, }, "assemble piece on demand fails closing reader": { @@ -745,7 +745,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, fmt.Sprintf("handing off deal to node: closing reader for piece %s from store %d: something went wrong", deal.Ref.Root, deal.StoreID), deal.Message) + require.Equal(t, fmt.Sprintf("handing off deal to node: closing reader for piece %s from store %d: something went wrong", deal.Ref.PieceCid, deal.StoreID), deal.Message) }, }, "assemble piece on demand fails closing reader and OnComplete fails": { @@ -760,7 +760,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, fmt.Sprintf("handing off deal to node: closing reader for piece %s from store %d: close reader failed: packing error: failed building sector", deal.Ref.Root, deal.StoreID), deal.Message) + require.Equal(t, fmt.Sprintf("handing off deal to node: closing reader for piece %s from store %d: close reader failed: packing error: failed building sector", deal.Ref.PieceCid, deal.StoreID), deal.Message) }, }, "assemble piece on demand fails async and OnComplete fails": { @@ -775,7 +775,7 @@ func TestHandoffDeal(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, fmt.Sprintf("handing off deal to node: writing piece %s: async err: packing error: failed building sector", deal.Ref.Root), deal.Message) + require.Equal(t, fmt.Sprintf("handing off deal to node: writing piece %s: async err: packing error: failed building sector", deal.Ref.PieceCid), deal.Message) }, }, }