diff --git a/dagstore/market_api.go b/dagstore/market_api.go index fa2e4106..7b588e33 100644 --- a/dagstore/market_api.go +++ b/dagstore/market_api.go @@ -2,6 +2,7 @@ package dagstore import ( "context" + "errors" "fmt" "io" @@ -15,12 +16,10 @@ import ( "github.com/filecoin-project/dagstore/throttle" "github.com/filecoin-project/go-padreader" gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" - vSharedTypes "github.com/filecoin-project/venus/venus-shared/types" marketMetrics "github.com/filecoin-project/venus-market/v2/metrics" "github.com/filecoin-project/venus-market/v2/models/repo" "github.com/filecoin-project/venus-market/v2/piecestorage" - "github.com/filecoin-project/venus-market/v2/storageprovider" "github.com/filecoin-project/venus-market/v2/utils" ) @@ -70,55 +69,10 @@ func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, err _, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String()) if err != nil { log.Warnf("unable to find storage for piece %s: %s", pieceCid, err) - - // check it from the SP through venus-gateway - deals, err := m.pieceRepo.GetDealsByPieceCidAndStatus(ctx, pieceCid, storageprovider.ReadyRetrievalDealStatus...) - if err != nil { - return false, fmt.Errorf("get delas for piece %s: %w", pieceCid, err) + if errors.Is(err, piecestorage.ErrorNotFoundForRead) { + return false, nil } - - if len(deals) == 0 { - return false, fmt.Errorf("no storage deals found for piece %s", pieceCid) - } - - // check if we have an unsealed deal for the given piece in any of the unsealed sectors. - for _, deal := range deals { - deal := deal - - // Throttle this path to avoid flooding the storage subsystem. - err := m.throttle.Do(ctx, func(ctx context.Context) (err error) { - - // send SectorsUnsealPiece task - wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize)) - if err != nil { - return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err) - } - - pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String()) - if err != nil { - return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err) - } - - return m.gatewayMarketClient.SectorsUnsealPiece( - ctx, - deal.Proposal.Provider, - pieceCid, - deal.SectorNumber, - vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()), - deal.Proposal.PieceSize, - pieceTransfer, - ) - }) - - if err != nil { - log.Warnf("failed to check/retrieve unsealed sector: %s", err) - continue // move on to the next match. - } - return true, nil - } - - // we don't have an unsealed sector containing the piece - return false, nil + return false, err } return true, nil @@ -132,8 +86,9 @@ func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) pieceStorage, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String()) if err != nil { - return nil, err + return nil, fmt.Errorf("find piece for read: %w", err) } + storageName := pieceStorage.GetName() size, err := pieceStorage.Len(ctx, pieceCid.String()) if err != nil { diff --git a/dagstore/mocks/mock_dagstore_interface.go b/dagstore/mocks/mock_dagstore_interface.go new file mode 100644 index 00000000..c15314dc --- /dev/null +++ b/dagstore/mocks/mock_dagstore_interface.go @@ -0,0 +1,198 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/filecoin-project/dagstore (interfaces: Interface) + +// Package mock_dagstore is a generated GoMock package. +package mock_dagstore + +import ( + context "context" + reflect "reflect" + + dagstore "github.com/filecoin-project/dagstore" + mount "github.com/filecoin-project/dagstore/mount" + shard "github.com/filecoin-project/dagstore/shard" + gomock "github.com/golang/mock/gomock" + index "github.com/ipld/go-car/v2/index" + multihash "github.com/multiformats/go-multihash" +) + +// MockDagStoreInterface is a mock of Interface interface. +type MockDagStoreInterface struct { + ctrl *gomock.Controller + recorder *MockDagStoreInterfaceMockRecorder +} + +// MockDagStoreInterfaceMockRecorder is the mock recorder for MockDagStoreInterface. +type MockDagStoreInterfaceMockRecorder struct { + mock *MockDagStoreInterface +} + +// NewMockDagStoreInterface creates a new mock instance. +func NewMockDagStoreInterface(ctrl *gomock.Controller) *MockDagStoreInterface { + mock := &MockDagStoreInterface{ctrl: ctrl} + mock.recorder = &MockDagStoreInterfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDagStoreInterface) EXPECT() *MockDagStoreInterfaceMockRecorder { + return m.recorder +} + +// AcquireShard mocks base method. +func (m *MockDagStoreInterface) AcquireShard(arg0 context.Context, arg1 shard.Key, arg2 chan dagstore.ShardResult, arg3 dagstore.AcquireOpts) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AcquireShard", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// AcquireShard indicates an expected call of AcquireShard. +func (mr *MockDagStoreInterfaceMockRecorder) AcquireShard(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireShard", reflect.TypeOf((*MockDagStoreInterface)(nil).AcquireShard), arg0, arg1, arg2, arg3) +} + +// AllShardsInfo mocks base method. +func (m *MockDagStoreInterface) AllShardsInfo() dagstore.AllShardsInfo { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AllShardsInfo") + ret0, _ := ret[0].(dagstore.AllShardsInfo) + return ret0 +} + +// AllShardsInfo indicates an expected call of AllShardsInfo. +func (mr *MockDagStoreInterfaceMockRecorder) AllShardsInfo() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllShardsInfo", reflect.TypeOf((*MockDagStoreInterface)(nil).AllShardsInfo)) +} + +// Close mocks base method. +func (m *MockDagStoreInterface) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockDagStoreInterfaceMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDagStoreInterface)(nil).Close)) +} + +// DestroyShard mocks base method. +func (m *MockDagStoreInterface) DestroyShard(arg0 context.Context, arg1 shard.Key, arg2 chan dagstore.ShardResult, arg3 dagstore.DestroyOpts) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DestroyShard", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// DestroyShard indicates an expected call of DestroyShard. +func (mr *MockDagStoreInterfaceMockRecorder) DestroyShard(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DestroyShard", reflect.TypeOf((*MockDagStoreInterface)(nil).DestroyShard), arg0, arg1, arg2, arg3) +} + +// GC mocks base method. +func (m *MockDagStoreInterface) GC(arg0 context.Context) (*dagstore.GCResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GC", arg0) + ret0, _ := ret[0].(*dagstore.GCResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GC indicates an expected call of GC. +func (mr *MockDagStoreInterfaceMockRecorder) GC(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GC", reflect.TypeOf((*MockDagStoreInterface)(nil).GC), arg0) +} + +// GetIterableIndex mocks base method. +func (m *MockDagStoreInterface) GetIterableIndex(arg0 shard.Key) (index.IterableIndex, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetIterableIndex", arg0) + ret0, _ := ret[0].(index.IterableIndex) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetIterableIndex indicates an expected call of GetIterableIndex. +func (mr *MockDagStoreInterfaceMockRecorder) GetIterableIndex(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIterableIndex", reflect.TypeOf((*MockDagStoreInterface)(nil).GetIterableIndex), arg0) +} + +// GetShardInfo mocks base method. +func (m *MockDagStoreInterface) GetShardInfo(arg0 shard.Key) (dagstore.ShardInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetShardInfo", arg0) + ret0, _ := ret[0].(dagstore.ShardInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetShardInfo indicates an expected call of GetShardInfo. +func (mr *MockDagStoreInterfaceMockRecorder) GetShardInfo(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetShardInfo", reflect.TypeOf((*MockDagStoreInterface)(nil).GetShardInfo), arg0) +} + +// RecoverShard mocks base method. +func (m *MockDagStoreInterface) RecoverShard(arg0 context.Context, arg1 shard.Key, arg2 chan dagstore.ShardResult, arg3 dagstore.RecoverOpts) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecoverShard", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecoverShard indicates an expected call of RecoverShard. +func (mr *MockDagStoreInterfaceMockRecorder) RecoverShard(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecoverShard", reflect.TypeOf((*MockDagStoreInterface)(nil).RecoverShard), arg0, arg1, arg2, arg3) +} + +// RegisterShard mocks base method. +func (m *MockDagStoreInterface) RegisterShard(arg0 context.Context, arg1 shard.Key, arg2 mount.Mount, arg3 chan dagstore.ShardResult, arg4 dagstore.RegisterOpts) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RegisterShard", arg0, arg1, arg2, arg3, arg4) + ret0, _ := ret[0].(error) + return ret0 +} + +// RegisterShard indicates an expected call of RegisterShard. +func (mr *MockDagStoreInterfaceMockRecorder) RegisterShard(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterShard", reflect.TypeOf((*MockDagStoreInterface)(nil).RegisterShard), arg0, arg1, arg2, arg3, arg4) +} + +// ShardsContainingMultihash mocks base method. +func (m *MockDagStoreInterface) ShardsContainingMultihash(arg0 context.Context, arg1 multihash.Multihash) ([]shard.Key, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ShardsContainingMultihash", arg0, arg1) + ret0, _ := ret[0].([]shard.Key) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ShardsContainingMultihash indicates an expected call of ShardsContainingMultihash. +func (mr *MockDagStoreInterfaceMockRecorder) ShardsContainingMultihash(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShardsContainingMultihash", reflect.TypeOf((*MockDagStoreInterface)(nil).ShardsContainingMultihash), arg0, arg1) +} + +// Start mocks base method. +func (m *MockDagStoreInterface) Start(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Start", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Start indicates an expected call of Start. +func (mr *MockDagStoreInterfaceMockRecorder) Start(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockDagStoreInterface)(nil).Start), arg0) +} diff --git a/dagstore/wrapper.go b/dagstore/wrapper.go index 82b53061..0b55bb71 100644 --- a/dagstore/wrapper.go +++ b/dagstore/wrapper.go @@ -211,36 +211,63 @@ func (w *Wrapper) gcLoop() { } func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.ClosableBlockstore, error) { - log.Debugf("acquiring shard for piece CID %s", pieceCid) + log := log.With("piece-cid", pieceCid) + log.Debug("acquiring shard") key := shard.KeyFromCID(pieceCid) - resch := make(chan dagstore.ShardResult, 1) - err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}) - log.Debugf("sent message to acquire shard for piece CID %s", pieceCid) - if err != nil { - if !errors.Is(err, dagstore.ErrShardUnknown) { - return nil, fmt.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err) + // get shard info + var sInfo dagstore.ShardInfo + var err error + retryCount := 5 + for i := retryCount; i >= 0; i-- { + if i == 0 { + return nil, fmt.Errorf("failed to get shard info for piece CID %s, after %d retry : %w", pieceCid, i, err) } - // if the DAGStore does not know about the Shard -> register it and then try to acquire it again. - log.Warnw("failed to load shard as shard is not registered, will re-register", "pieceCID", pieceCid) - // The path of a transient file that we can ask the DAG Store to use - // to perform the Indexing rather than fetching it via the Mount if - // we already have a transient file. However, we don't have it here - // and therefore we pass an empty file path. - carPath := "" - if err := stores.RegisterShardSync(ctx, w, pieceCid, carPath, false); err != nil { - return nil, fmt.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err) + sInfo, err = w.dagst.GetShardInfo(key) + if err != nil { + if errors.Is(err, dagstore.ErrShardUnknown) { + log.Warn("shard not found, try to re-register") + if err := stores.RegisterShardSync(ctx, w, pieceCid, "", false); err != nil { + return nil, fmt.Errorf("failed to re-register shard during loading pieceCID %s: %w", pieceCid, err) + } + continue + } else { + return nil, fmt.Errorf("failed to get shard info for piece CID %s: %w", pieceCid, err) + } } - log.Warnw("successfully re-registered shard", "pieceCID", pieceCid) - - resch = make(chan dagstore.ShardResult, 1) - if err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil { - return nil, fmt.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err) + break + } + + // check state + log.Infof("shard state: %s", sInfo.ShardState.String()) + switch sInfo.ShardState { + case dagstore.ShardStateErrored: + // try to recover + log.Warn("shard is in errored state, try to recover") + recoverRes := make(chan dagstore.ShardResult, 1) + if err := w.dagst.RecoverShard(ctx, key, recoverRes, dagstore.RecoverOpts{}); err != nil { + return nil, fmt.Errorf("failed to recover shard for piece CID %s: %w", pieceCid, err) + } + select { + case res := <-recoverRes: + if res.Error != nil { + return nil, fmt.Errorf("failed to recover shard for piece CID %s: %w", pieceCid, res.Error) + } + case <-ctx.Done(): + return nil, ctx.Err() } } + resCh := make(chan dagstore.ShardResult, 1) + err = w.dagst.AcquireShard(ctx, key, resCh, dagstore.AcquireOpts{}) + log.Debugf("sent message to acquire shard for piece CID %s", pieceCid) + + if err != nil { + return nil, fmt.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, err) + } + // TODO: The context is not yet being actively monitored by the DAG store, // so we need to select against ctx.Done() until the following issue is // implemented: @@ -249,7 +276,7 @@ func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.Closa select { case <-ctx.Done(): return nil, ctx.Err() - case res = <-resch: + case res = <-resCh: if res.Error != nil { return nil, fmt.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, res.Error) } diff --git a/dagstore/wrapper_test.go b/dagstore/wrapper_test.go index 32a1e998..13f64ecf 100644 --- a/dagstore/wrapper_test.go +++ b/dagstore/wrapper_test.go @@ -8,9 +8,11 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" mh "github.com/multiformats/go-multihash" "github.com/filecoin-project/venus-market/v2/config" + mock_dagstore "github.com/filecoin-project/venus-market/v2/dagstore/mocks" carindex "github.com/ipld/go-car/v2/index" "github.com/filecoin-project/dagstore" @@ -21,9 +23,11 @@ import ( "github.com/stretchr/testify/require" ) -// TestWrapperAcquireRecoveryDestroy verifies that if acquire shard returns a "not found" +//go:generate go run github.com/golang/mock/mockgen -destination=./mocks/mock_dagstore_interface.go -package=mock_dagstore -mock_names Interface=MockDagStoreInterface github.com/filecoin-project/dagstore Interface + +// TestWrapperLoadShard verifies that if acquire shard returns a "not found" // error, the wrapper will attempt to register the shard then reacquire -func TestWrapperAcquireRecoveryDestroy(t *testing.T) { +func TestWrapperLoadShard(t *testing.T) { ctx := context.Background() pieceCid, err := cid.Parse("bafkqaaa") require.NoError(t, err) @@ -41,64 +45,52 @@ func TestWrapperAcquireRecoveryDestroy(t *testing.T) { acquireShardErr := make(chan error, 1) acquireShardErr <- fmt.Errorf("unknown shard: %w", dagstore.ErrShardUnknown) - // Create a mock DAG store in place of the real DAG store - mock := &mockDagStore{ - acquireShardErr: acquireShardErr, - acquireShardRes: dagstore.ShardResult{ - Accessor: getShardAccessor(t), - }, - register: make(chan shard.Key, 1), - destroy: make(chan shard.Key, 1), - } - w.dagst = mock - - mybs, err := w.LoadShard(ctx, pieceCid) - require.NoError(t, err) - - // Expect the wrapper to try to recover from the error returned from - // acquire shard by calling register shard with the same key - tctx, cancel := context.WithTimeout(ctx, time.Second) - defer cancel() - select { - case <-tctx.Done(): - require.Fail(t, "failed to call register") - case k := <-mock.register: - require.Equal(t, k.String(), pieceCid.String()) - } - - // Verify that we can get things from the acquired blockstore - var count int - ch, err := mybs.AllKeysChan(ctx) - require.NoError(t, err) - for range ch { - count++ - } - require.Greater(t, count, 0) - - // Destroy the shard - dr := make(chan dagstore.ShardResult, 1) - err = w.DestroyShard(ctx, pieceCid, dr) - require.NoError(t, err) - res := <-dr - require.NoError(t, res.Error) - require.Equal(t, shard.KeyFromCID(pieceCid), res.Key) - - dctx, cancel := context.WithTimeout(ctx, time.Second) - defer cancel() - select { - case <-dctx.Done(): - require.Fail(t, "failed to call destroy") - case k := <-mock.destroy: - require.Equal(t, k.String(), pieceCid.String()) - } - - var dcount int - dch, err := mybs.AllKeysChan(ctx) - require.NoError(t, err) - for range dch { - count++ - } - require.Equal(t, dcount, 0) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + t.Run("re-register shard when not found", func(t *testing.T) { + dagMock := mock_dagstore.NewMockDagStoreInterface(ctrl) + w.dagst = dagMock + + dagMock.EXPECT().GetShardInfo(gomock.Any()).Return(dagstore.ShardInfo{}, dagstore.ErrShardUnknown) + dagMock.EXPECT().RegisterShard(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error { + out <- dagstore.ShardResult{} + return nil + }) + dagMock.EXPECT().GetShardInfo(gomock.Any()).Return(dagstore.ShardInfo{ + ShardState: dagstore.ShardStateAvailable, + }, nil) + dagMock.EXPECT().AcquireShard(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error { + out <- dagstore.ShardResult{ + Accessor: getShardAccessor(t), + } + return nil + }) + + _, err = w.LoadShard(ctx, pieceCid) + require.NoError(t, err) + }) + + t.Run("recover shard when shard state error", func(t *testing.T) { + dagMock := mock_dagstore.NewMockDagStoreInterface(ctrl) + w.dagst = dagMock + + dagMock.EXPECT().GetShardInfo(gomock.Any()).Return(dagstore.ShardInfo{ShardState: dagstore.ShardStateErrored}, nil) + dagMock.EXPECT().RecoverShard(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error { + out <- dagstore.ShardResult{} + return nil + }) + dagMock.EXPECT().AcquireShard(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error { + out <- dagstore.ShardResult{ + Accessor: getShardAccessor(t), + } + return nil + }) + + _, err = w.LoadShard(ctx, pieceCid) + require.NoError(t, err) + + }) } // TestWrapperBackground verifies the behaviour of the background go routine diff --git a/go.mod b/go.mod index f508e6e1..0f1e8bca 100644 --- a/go.mod +++ b/go.mod @@ -30,8 +30,8 @@ require ( github.com/filecoin-project/go-statestore v0.2.0 github.com/filecoin-project/specs-actors/v2 v2.3.6 github.com/filecoin-project/specs-actors/v7 v7.0.1 - github.com/filecoin-project/venus v1.11.0 - github.com/filecoin-project/venus-auth v1.11.0 + github.com/filecoin-project/venus v1.11.2-0.20230525071843-ab19674dd7b3 + github.com/filecoin-project/venus-auth v1.11.1-0.20230511013901-7829b3effbcd github.com/filecoin-project/venus-messager v1.11.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 @@ -40,7 +40,7 @@ require ( github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 - github.com/ipfs-force-community/venus-gateway v1.11.0 + github.com/ipfs-force-community/venus-gateway v1.11.2-0.20230525072131-90ce0561276b github.com/ipfs/go-blockservice v0.5.0 github.com/ipfs/go-cid v0.3.2 github.com/ipfs/go-cidutil v0.1.0 diff --git a/go.sum b/go.sum index 0d3f8721..1ad509f1 100644 --- a/go.sum +++ b/go.sum @@ -468,11 +468,11 @@ github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8= github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E= github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0= -github.com/filecoin-project/venus v1.11.0 h1:cH7ydd+O2dw7zg8tKfeiuwVd5SokZ8TBu+WoBU60pAA= -github.com/filecoin-project/venus v1.11.0/go.mod h1:H8A3djsrHKRWuKnJI/8Y6xZRudbV9V2x5NIP8/PVPfQ= +github.com/filecoin-project/venus v1.11.2-0.20230525071843-ab19674dd7b3 h1:xUiO9SyFj6NdpFIZohgo1fqqmR0DEZm5pzew+0dsBMM= +github.com/filecoin-project/venus v1.11.2-0.20230525071843-ab19674dd7b3/go.mod h1:9QctTOegFH+hZ5icsuR2BRRnZMhyLDQa5uJAbOog76M= github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU= -github.com/filecoin-project/venus-auth v1.11.0 h1:9PBswWxc113vqaHABMcRyMm+1BtlJCwOFTPQJg/OVtQ= -github.com/filecoin-project/venus-auth v1.11.0/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs= +github.com/filecoin-project/venus-auth v1.11.1-0.20230511013901-7829b3effbcd h1:l02UJuEbSUIBi3NC/+17K2gBbAzsUNQg42rNCXskOBc= +github.com/filecoin-project/venus-auth v1.11.1-0.20230511013901-7829b3effbcd/go.mod h1:PoTmfEn5lljjAQThBzX0+friJYGgi7Z3VLLujkOkCT4= github.com/filecoin-project/venus-messager v1.11.0 h1:OJNSnWqhQl9PLzwRNR3huz49k+SU6pQ4DzEG4TXDqic= github.com/filecoin-project/venus-messager v1.11.0/go.mod h1:dZlz/xrF2SREfmR3/6xp3AMP9FthCW68N3zEvBOnqM4= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= @@ -835,8 +835,8 @@ github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go. github.com/ipfs-force-community/venus-common-utils v0.0.0-20210924063144-1d3a5b30de87/go.mod h1:RTVEOzM+hkpqmcEWpyLDkx1oGO5r9ZWCgYxG/CsXzJQ= github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 h1:v/1/INcqm3kHLauWQYB63MwWJRWGz+3WEuUPp0jzIl8= github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7/go.mod h1:sSTUXgIu95tPHvgcYhdLuELmgPJWCP/pNMFtsrVtOyA= -github.com/ipfs-force-community/venus-gateway v1.11.0 h1:wcnmOtaYC39lz4Q02mDioFmWV9K2k8wRgNx7trIcHTY= -github.com/ipfs-force-community/venus-gateway v1.11.0/go.mod h1:gMemySuLZkGC+PVO8+5ru7vLwRuD4cNZn7bsLVYiuP0= +github.com/ipfs-force-community/venus-gateway v1.11.2-0.20230525072131-90ce0561276b h1:WJYSYAcvHqo6ht8TRPX8tt1R9ki7fpa2zraiHIw1DUc= +github.com/ipfs-force-community/venus-gateway v1.11.2-0.20230525072131-90ce0561276b/go.mod h1:fJzlqs3UfzmPXyuIfFc0tuUWJ7L+7hmcBxFl6jsRUWE= github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= @@ -850,8 +850,8 @@ github.com/ipfs/go-bitswap v0.1.8/go.mod h1:TOWoxllhccevbWFUR2N7B1MTSVVge1s6XSMi github.com/ipfs/go-bitswap v0.3.4/go.mod h1:4T7fvNv/LmOys+21tnLzGKncMeeXUYUd1nUiJ2teMvI= github.com/ipfs/go-bitswap v0.5.1/go.mod h1:P+ckC87ri1xFLvk74NlXdP0Kj9RmWAh4+H78sC6Qopo= github.com/ipfs/go-bitswap v0.6.0/go.mod h1:Hj3ZXdOC5wBJvENtdqsixmzzRukqd8EHLxZLZc3mzRA= -github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= github.com/ipfs/go-bitswap v0.11.0/go.mod h1:05aE8H3XOU+LXpTedeAS0OZpcO1WFsj5niYQH9a1Tmk= +github.com/ipfs/go-bitswap v0.12.0 h1:ClbLaufwv8SRQK0sBhl4wDVqJoZGAGMVxdjQy5CTt6c= github.com/ipfs/go-block-format v0.0.1/go.mod h1:DK/YYcsSUIVAFNwo/KZCdIIbpN0ROH/baNLgayt4pFc= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk= diff --git a/piecestorage/filestore.go b/piecestorage/filestore.go index cff96862..b4c83ef7 100644 --- a/piecestorage/filestore.go +++ b/piecestorage/filestore.go @@ -95,8 +95,8 @@ func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (s return "", fmt.Errorf("%s id readonly piece store", f.fsCfg.Name) } - // url example: http://market/resource?resource-id=xxx&store=xxx - url := fmt.Sprintf("/resource?resource-id=%s&store=%s", pieceCid, f.fsCfg.Name) + // url example: market://store_name/piece_cid => http://market_ip/resource?resource-id=piece_cid&store=store_name + url := fmt.Sprintf("market://%s/%s", f.fsCfg.Name, pieceCid) return url, nil } diff --git a/piecestorage/storagemgr.go b/piecestorage/storagemgr.go index 9c55c507..e213f945 100644 --- a/piecestorage/storagemgr.go +++ b/piecestorage/storagemgr.go @@ -10,6 +10,8 @@ import ( types "github.com/filecoin-project/venus/venus-shared/types/market" ) +var ErrorNotFoundForRead = fmt.Errorf("not found for read") + type PieceStorageManager struct { lk sync.RWMutex storages map[string]IPieceStorage @@ -74,7 +76,7 @@ func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string) }) if len(storages) == 0 { - return nil, fmt.Errorf("unable to find piece %s in storage", s) + return nil, fmt.Errorf("unable to find piece %s: %w", s, ErrorNotFoundForRead) } return randStorageSelector(storages) diff --git a/retrievalprovider/datatransfer_handler.go b/retrievalprovider/datatransfer_handler.go index 71d4d071..439a7094 100644 --- a/retrievalprovider/datatransfer_handler.go +++ b/retrievalprovider/datatransfer_handler.go @@ -55,7 +55,12 @@ func (d *DataTransferHandler) HandleAcceptFor(ctx context.Context, identifier rm case rm.DealStatusFundsNeededUnseal: // nothing needs to do. return nil case rm.DealStatusNew: - return d.retrievalDealHandler.UnsealData(ctx, deal) + err := d.retrievalDealHandler.UnsealData(ctx, deal) + if err != nil { + log.Errorf("unseal data error: %s", err.Error()) + return err + } + return nil default: return fmt.Errorf("invalid state transition, state `%+v`, event `%+v`", deal.Status, rm.ProviderEventDealAccepted) } diff --git a/retrievalprovider/provider.go b/retrievalprovider/provider.go index a3ece11a..b103f98c 100644 --- a/retrievalprovider/provider.go +++ b/retrievalprovider/provider.go @@ -16,8 +16,10 @@ import ( "github.com/filecoin-project/venus-market/v2/models/repo" "github.com/filecoin-project/venus-market/v2/network" "github.com/filecoin-project/venus-market/v2/paychmgr" + "github.com/filecoin-project/venus-market/v2/piecestorage" v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" + "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" types "github.com/filecoin-project/venus/venus-shared/types/market" ) @@ -59,6 +61,8 @@ func NewProvider( repo repo.Repo, cfg *config.MarketConfig, rdf config.RetrievalDealFilter, + pieceStorageMgr *piecestorage.PieceStorageManager, + gatewayMarketClient gateway.IMarketClient, ) (*RetrievalProvider, error) { storageDealsRepo := repo.StorageDealRepo() retrievalDealRepo := repo.RetrievalDealRepo() @@ -75,7 +79,7 @@ func NewProvider( retrievalStreamHandler: NewRetrievalStreamHandler(cfg, retrievalAskRepo, retrievalDealRepo, storageDealsRepo, pieceInfo), } - retrievalHandler := NewRetrievalDealHandler(&providerDealEnvironment{p}, retrievalDealRepo, storageDealsRepo) + retrievalHandler := NewRetrievalDealHandler(&providerDealEnvironment{p}, retrievalDealRepo, storageDealsRepo, gatewayMarketClient, pieceStorageMgr) p.requestValidator = NewProviderRequestValidator(cfg, storageDealsRepo, retrievalDealRepo, retrievalAskRepo, pieceInfo, rdf) transportConfigurer := dtutils.TransportConfigurer(network.ID(), &providerStoreGetter{retrievalDealRepo, p.stores}) p.reValidator = NewProviderRevalidator(fullNode, payAPI, retrievalDealRepo, retrievalHandler) diff --git a/retrievalprovider/retrieval_handler.go b/retrievalprovider/retrieval_handler.go index d149300f..e0854f09 100644 --- a/retrievalprovider/retrieval_handler.go +++ b/retrievalprovider/retrieval_handler.go @@ -3,69 +3,163 @@ package retrievalprovider import ( "context" "errors" + "fmt" + "time" - types "github.com/filecoin-project/venus/venus-shared/types/market" + vtypes "github.com/filecoin-project/venus/venus-shared/types" + gtypes "github.com/filecoin-project/venus/venus-shared/types/gateway" + mktypes "github.com/filecoin-project/venus/venus-shared/types/market" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/venus-market/v2/models/repo" + "github.com/filecoin-project/venus-market/v2/piecestorage" + "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" ) type IRetrievalHandler interface { - UnsealData(ctx context.Context, deal *types.ProviderDealState) error - CancelDeal(ctx context.Context, deal *types.ProviderDealState) error - CleanupDeal(ctx context.Context, deal *types.ProviderDealState) error - Error(ctx context.Context, deal *types.ProviderDealState, err error) error + UnsealData(ctx context.Context, deal *mktypes.ProviderDealState) error + CancelDeal(ctx context.Context, deal *mktypes.ProviderDealState) error + CleanupDeal(ctx context.Context, deal *mktypes.ProviderDealState) error + Error(ctx context.Context, deal *mktypes.ProviderDealState, err error) error } var _ IRetrievalHandler = (*RetrievalDealHandler)(nil) type RetrievalDealHandler struct { - env ProviderDealEnvironment - retrievalDealStore repo.IRetrievalDealRepo - storageDealRepo repo.StorageDealRepo + env ProviderDealEnvironment + retrievalDealStore repo.IRetrievalDealRepo + storageDealRepo repo.StorageDealRepo + gatewayMarketClient gateway.IMarketClient + pieceStorageMgr *piecestorage.PieceStorageManager } -func NewRetrievalDealHandler(env ProviderDealEnvironment, retrievalDealStore repo.IRetrievalDealRepo, storageDealRepo repo.StorageDealRepo) IRetrievalHandler { - return &RetrievalDealHandler{env: env, retrievalDealStore: retrievalDealStore, storageDealRepo: storageDealRepo} +func NewRetrievalDealHandler(env ProviderDealEnvironment, retrievalDealStore repo.IRetrievalDealRepo, storageDealRepo repo.StorageDealRepo, gatewayMarketClient gateway.IMarketClient, pieceStorageMgr *piecestorage.PieceStorageManager) IRetrievalHandler { + return &RetrievalDealHandler{ + env: env, + retrievalDealStore: retrievalDealStore, + storageDealRepo: storageDealRepo, + gatewayMarketClient: gatewayMarketClient, + pieceStorageMgr: pieceStorageMgr, + } } -func (p *RetrievalDealHandler) UnsealData(ctx context.Context, deal *types.ProviderDealState) error { - deal.Status = rm.DealStatusUnsealing - err := p.retrievalDealStore.SaveDeal(ctx, deal) +func (p *RetrievalDealHandler) UnsealData(ctx context.Context, providerDeal *mktypes.ProviderDealState) (err error) { + log := log.With("dealId", providerDeal.ID) + providerDeal.Status = rm.DealStatusUnsealing + err = p.retrievalDealStore.SaveDeal(ctx, providerDeal) if err != nil { - return err + return } - storageDeal, err := p.storageDealRepo.GetDeal(ctx, deal.SelStorageProposalCid) + deal, err := p.storageDealRepo.GetDeal(ctx, providerDeal.SelStorageProposalCid) if err != nil { - return err + return } - if err := p.env.PrepareBlockstore(ctx, deal.ID, storageDeal.Proposal.PieceCID); err != nil { - log.Errorf("unable to load shard %s %w", storageDeal.Proposal.PieceCID, err) - return p.CancelDeal(ctx, deal) + pieceCid := deal.Proposal.PieceCID + log = log.With("pieceCid", pieceCid) + + // check piece exist + + st, err := p.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String()) + if err != nil { + // check fail, but unseal should continue + log.Infof("try to find piece fail: %w", err) } - log.Debugf("blockstore prepared successfully, firing unseal complete for deal %d", deal.ID) - deal.Status = rm.DealStatusUnsealed - err = p.retrievalDealStore.SaveDeal(ctx, deal) + + if st != nil { + log.Info("piece already exist, no need to unseal") + } else { + // try unseal + var wps piecestorage.IPieceStorage + wps, err = p.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize)) + if err != nil { + err = fmt.Errorf("failed to find storage to write %s: %w", deal.Proposal.PieceCID, err) + return + } + + var pieceTransfer string + pieceTransfer, err = wps.GetPieceTransfer(ctx, pieceCid.String()) + if err != nil { + err = fmt.Errorf("get piece transfer for %s: %w", pieceCid, err) + return + } + + log.Info("try to unseal") + // should block util unseal finish or error, because it will resume transfer later + state := gtypes.UnsealStateFailed + checkUnsealInterval := 5 * time.Minute + ticker := time.NewTicker(checkUnsealInterval) + defer ticker.Stop() + timeOutCtx, cancel := context.WithTimeout(ctx, 12*time.Hour) + defer cancel() + + errRetry, errRetryCount := 5, 0 + + CheckLoop: + for state != gtypes.UnsealStateFinished { + state, err = p.gatewayMarketClient.SectorsUnsealPiece( + ctx, + deal.Proposal.Provider, + pieceCid, + deal.SectorNumber, + vtypes.UnpaddedByteIndex(deal.Offset.Unpadded()), + deal.Proposal.PieceSize.Unpadded(), + pieceTransfer, + ) + if err != nil { + err = fmt.Errorf("unseal piece %s: %w", pieceCid, err) + errRetryCount++ + log.Warnf("unseal piece %s fail, retry (%d/%d): %w", pieceCid, errRetryCount, errRetry, err) + if errRetryCount > errRetry { + return + } + } + log.Debugf("unseal piece %s: %s", pieceCid, state) + switch state { + case gtypes.UnsealStateFailed: + err = fmt.Errorf("unseal piece %s fail: %w", pieceCid, err) + return + case gtypes.UnsealStateFinished: + break CheckLoop + } + select { + case <-ticker.C: + case <-timeOutCtx.Done(): + err = ctx.Err() + return + } + } + log.Info("unseal piece success") + } + + if err = p.env.PrepareBlockstore(ctx, providerDeal.ID, deal.Proposal.PieceCID); err != nil { + log.Errorf("unable to load shard %s %s", deal.Proposal.PieceCID, err.Error()) + err = p.CancelDeal(ctx, providerDeal) + return + } + log.Debugf("blockstore prepared successfully, firing unseal complete for deal %d", providerDeal.ID) + providerDeal.Status = rm.DealStatusUnsealed + err = p.retrievalDealStore.SaveDeal(ctx, providerDeal) if err != nil { - return err + return } - log.Debugf("unpausing data transfer for deal %d", deal.ID) + log.Debugf("unpausing data transfer for deal %d", providerDeal.ID) - if deal.ChannelID != nil { - log.Debugf("resuming data transfer for deal %d", deal.ID) - err = p.env.ResumeDataTransfer(ctx, *deal.ChannelID) + if providerDeal.ChannelID != nil { + log.Debugf("resuming data transfer for deal %d", providerDeal.ID) + err = p.env.ResumeDataTransfer(ctx, *providerDeal.ChannelID) if err != nil { - deal.Status = rm.DealStatusErrored + providerDeal.Status = rm.DealStatusErrored } } - return p.retrievalDealStore.SaveDeal(ctx, deal) + err = p.retrievalDealStore.SaveDeal(ctx, providerDeal) + return } -func (p *RetrievalDealHandler) CancelDeal(ctx context.Context, deal *types.ProviderDealState) error { +func (p *RetrievalDealHandler) CancelDeal(ctx context.Context, deal *mktypes.ProviderDealState) error { // Read next response (or fail) err := p.env.DeleteStore(deal.ID) if err != nil { @@ -82,7 +176,7 @@ func (p *RetrievalDealHandler) CancelDeal(ctx context.Context, deal *types.Provi } // CleanupDeal runs to do memory cleanup for an in progress deal -func (p *RetrievalDealHandler) CleanupDeal(ctx context.Context, deal *types.ProviderDealState) error { +func (p *RetrievalDealHandler) CleanupDeal(ctx context.Context, deal *mktypes.ProviderDealState) error { err := p.env.DeleteStore(deal.ID) if err != nil { return p.Error(ctx, deal, nil) @@ -91,7 +185,7 @@ func (p *RetrievalDealHandler) CleanupDeal(ctx context.Context, deal *types.Prov return p.retrievalDealStore.SaveDeal(ctx, deal) } -func (p *RetrievalDealHandler) Error(ctx context.Context, deal *types.ProviderDealState, err error) error { +func (p *RetrievalDealHandler) Error(ctx context.Context, deal *mktypes.ProviderDealState, err error) error { deal.Status = rm.DealStatusErrored if err != nil { deal.Message = err.Error() diff --git a/retrievalprovider/revalidator.go b/retrievalprovider/revalidator.go index f36916e6..ee20d4bf 100644 --- a/retrievalprovider/revalidator.go +++ b/retrievalprovider/revalidator.go @@ -124,7 +124,12 @@ func (pr *ProviderRevalidator) processPayment(ctx context.Context, deal *types.P // pay for unseal goto unseal deal.Status = retrievalmarket.DealStatusUnsealing defer func() { - go pr.retrievalDealHandler.UnsealData(ctx, deal) //nolint + go func() { + err := pr.retrievalDealHandler.UnsealData(ctx, deal) //nolint + if err != nil { + log.Errorf("provider: unable to unseal data %v", err) + } + }() }() err = nil case retrievalmarket.DealStatusUnsealing: