diff --git a/common/ledger/blockledger/fileledger/factory.go b/common/ledger/blockledger/fileledger/factory.go index ec1297e1ce8..6815f1351b5 100644 --- a/common/ledger/blockledger/fileledger/factory.go +++ b/common/ledger/blockledger/fileledger/factory.go @@ -24,7 +24,7 @@ type blockStoreProvider interface { type fileLedgerFactory struct { blkstorageProvider blockStoreProvider - ledgers map[string]blockledger.ReadWriter + ledgers map[string]*FileLedger mutex sync.Mutex } @@ -55,6 +55,13 @@ func (f *fileLedgerFactory) Remove(channelID string) error { f.mutex.Lock() defer f.mutex.Unlock() + // check cache for open blockstore and, if one exists, + // shut it down in order to avoid resource contention + ledger, ok := f.ledgers[channelID] + if ok { + ledger.blockStore.Shutdown() + } + err := f.blkstorageProvider.Drop(channelID) if err != nil { return err @@ -92,6 +99,6 @@ func New(directory string, metricsProvider metrics.Provider) (blockledger.Factor } return &fileLedgerFactory{ blkstorageProvider: p, - ledgers: make(map[string]blockledger.ReadWriter), + ledgers: map[string]*FileLedger{}, }, nil } diff --git a/common/ledger/blockledger/fileledger/factory_test.go b/common/ledger/blockledger/fileledger/factory_test.go index ecb9193036f..800802a83f6 100644 --- a/common/ledger/blockledger/fileledger/factory_test.go +++ b/common/ledger/blockledger/fileledger/factory_test.go @@ -12,21 +12,26 @@ import ( "os" "testing" - "github.com/hyperledger/fabric/common/ledger/blockledger" "github.com/hyperledger/fabric/common/ledger/blockledger/fileledger/mock" "github.com/hyperledger/fabric/common/metrics/disabled" "github.com/stretchr/testify/require" ) +//go:generate counterfeiter -o mock/file_ledger_block_store.go --fake-name FileLedgerBlockStore . fileLedgerBlockStore + +type fileLedgerBlockStore interface { + FileLedgerBlockStore +} + func TestBlockStoreProviderErrors(t *testing.T) { - mockBlockStore := &mock.BlockStoreProvider{} + mockBlockStoreProvider := &mock.BlockStoreProvider{} f := &fileLedgerFactory{ - blkstorageProvider: mockBlockStore, - ledgers: map[string]blockledger.ReadWriter{}, + blkstorageProvider: mockBlockStoreProvider, + ledgers: map[string]*FileLedger{}, } t.Run("list", func(t *testing.T) { - mockBlockStore.ListReturns(nil, errors.New("boogie")) + mockBlockStoreProvider.ListReturns(nil, errors.New("boogie")) require.PanicsWithValue( t, "boogie", @@ -36,16 +41,29 @@ func TestBlockStoreProviderErrors(t *testing.T) { }) t.Run("open", func(t *testing.T) { - mockBlockStore.OpenReturns(nil, errors.New("woogie")) + mockBlockStoreProvider.OpenReturns(nil, errors.New("woogie")) _, err := f.GetOrCreate("foo") require.EqualError(t, err, "woogie") require.Empty(t, f.ledgers, "Expected no new ledger is created") }) t.Run("remove", func(t *testing.T) { - mockBlockStore.DropReturns(errors.New("oogie")) - err := f.Remove("foo") - require.EqualError(t, err, "oogie") + t.Run("ledger doesn't exist", func(t *testing.T) { + err := f.Remove("foo") + require.NoError(t, err) + require.Equal(t, 1, mockBlockStoreProvider.DropCallCount()) + }) + + t.Run("dropping the blockstore fails", func(t *testing.T) { + mockBlockStore := &mock.FileLedgerBlockStore{} + f.ledgers["foo"] = &FileLedger{blockStore: mockBlockStore} + mockBlockStoreProvider.DropReturns(errors.New("oogie")) + + err := f.Remove("foo") + require.EqualError(t, err, "oogie") + require.Equal(t, 1, mockBlockStore.ShutdownCallCount()) + require.Equal(t, 2, mockBlockStoreProvider.DropCallCount()) + }) }) } diff --git a/common/ledger/blockledger/fileledger/impl.go b/common/ledger/blockledger/fileledger/impl.go index 25067b11558..f6044de865b 100644 --- a/common/ledger/blockledger/fileledger/impl.go +++ b/common/ledger/blockledger/fileledger/impl.go @@ -28,6 +28,7 @@ type FileLedgerBlockStore interface { AddBlock(block *cb.Block) error GetBlockchainInfo() (*cb.BlockchainInfo, error) RetrieveBlocks(startBlockNumber uint64) (ledger.ResultsIterator, error) + Shutdown() } // NewFileLedger creates a new FileLedger for interaction with the ledger diff --git a/common/ledger/blockledger/fileledger/mock/file_ledger_block_store.go b/common/ledger/blockledger/fileledger/mock/file_ledger_block_store.go new file mode 100644 index 00000000000..1018249dc6b --- /dev/null +++ b/common/ledger/blockledger/fileledger/mock/file_ledger_block_store.go @@ -0,0 +1,285 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + "sync" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/ledger" +) + +type FileLedgerBlockStore struct { + AddBlockStub func(*common.Block) error + addBlockMutex sync.RWMutex + addBlockArgsForCall []struct { + arg1 *common.Block + } + addBlockReturns struct { + result1 error + } + addBlockReturnsOnCall map[int]struct { + result1 error + } + GetBlockchainInfoStub func() (*common.BlockchainInfo, error) + getBlockchainInfoMutex sync.RWMutex + getBlockchainInfoArgsForCall []struct { + } + getBlockchainInfoReturns struct { + result1 *common.BlockchainInfo + result2 error + } + getBlockchainInfoReturnsOnCall map[int]struct { + result1 *common.BlockchainInfo + result2 error + } + RetrieveBlocksStub func(uint64) (ledger.ResultsIterator, error) + retrieveBlocksMutex sync.RWMutex + retrieveBlocksArgsForCall []struct { + arg1 uint64 + } + retrieveBlocksReturns struct { + result1 ledger.ResultsIterator + result2 error + } + retrieveBlocksReturnsOnCall map[int]struct { + result1 ledger.ResultsIterator + result2 error + } + ShutdownStub func() + shutdownMutex sync.RWMutex + shutdownArgsForCall []struct { + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FileLedgerBlockStore) AddBlock(arg1 *common.Block) error { + fake.addBlockMutex.Lock() + ret, specificReturn := fake.addBlockReturnsOnCall[len(fake.addBlockArgsForCall)] + fake.addBlockArgsForCall = append(fake.addBlockArgsForCall, struct { + arg1 *common.Block + }{arg1}) + fake.recordInvocation("AddBlock", []interface{}{arg1}) + fake.addBlockMutex.Unlock() + if fake.AddBlockStub != nil { + return fake.AddBlockStub(arg1) + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.addBlockReturns + return fakeReturns.result1 +} + +func (fake *FileLedgerBlockStore) AddBlockCallCount() int { + fake.addBlockMutex.RLock() + defer fake.addBlockMutex.RUnlock() + return len(fake.addBlockArgsForCall) +} + +func (fake *FileLedgerBlockStore) AddBlockCalls(stub func(*common.Block) error) { + fake.addBlockMutex.Lock() + defer fake.addBlockMutex.Unlock() + fake.AddBlockStub = stub +} + +func (fake *FileLedgerBlockStore) AddBlockArgsForCall(i int) *common.Block { + fake.addBlockMutex.RLock() + defer fake.addBlockMutex.RUnlock() + argsForCall := fake.addBlockArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FileLedgerBlockStore) AddBlockReturns(result1 error) { + fake.addBlockMutex.Lock() + defer fake.addBlockMutex.Unlock() + fake.AddBlockStub = nil + fake.addBlockReturns = struct { + result1 error + }{result1} +} + +func (fake *FileLedgerBlockStore) AddBlockReturnsOnCall(i int, result1 error) { + fake.addBlockMutex.Lock() + defer fake.addBlockMutex.Unlock() + fake.AddBlockStub = nil + if fake.addBlockReturnsOnCall == nil { + fake.addBlockReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.addBlockReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FileLedgerBlockStore) GetBlockchainInfo() (*common.BlockchainInfo, error) { + fake.getBlockchainInfoMutex.Lock() + ret, specificReturn := fake.getBlockchainInfoReturnsOnCall[len(fake.getBlockchainInfoArgsForCall)] + fake.getBlockchainInfoArgsForCall = append(fake.getBlockchainInfoArgsForCall, struct { + }{}) + fake.recordInvocation("GetBlockchainInfo", []interface{}{}) + fake.getBlockchainInfoMutex.Unlock() + if fake.GetBlockchainInfoStub != nil { + return fake.GetBlockchainInfoStub() + } + if specificReturn { + return ret.result1, ret.result2 + } + fakeReturns := fake.getBlockchainInfoReturns + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FileLedgerBlockStore) GetBlockchainInfoCallCount() int { + fake.getBlockchainInfoMutex.RLock() + defer fake.getBlockchainInfoMutex.RUnlock() + return len(fake.getBlockchainInfoArgsForCall) +} + +func (fake *FileLedgerBlockStore) GetBlockchainInfoCalls(stub func() (*common.BlockchainInfo, error)) { + fake.getBlockchainInfoMutex.Lock() + defer fake.getBlockchainInfoMutex.Unlock() + fake.GetBlockchainInfoStub = stub +} + +func (fake *FileLedgerBlockStore) GetBlockchainInfoReturns(result1 *common.BlockchainInfo, result2 error) { + fake.getBlockchainInfoMutex.Lock() + defer fake.getBlockchainInfoMutex.Unlock() + fake.GetBlockchainInfoStub = nil + fake.getBlockchainInfoReturns = struct { + result1 *common.BlockchainInfo + result2 error + }{result1, result2} +} + +func (fake *FileLedgerBlockStore) GetBlockchainInfoReturnsOnCall(i int, result1 *common.BlockchainInfo, result2 error) { + fake.getBlockchainInfoMutex.Lock() + defer fake.getBlockchainInfoMutex.Unlock() + fake.GetBlockchainInfoStub = nil + if fake.getBlockchainInfoReturnsOnCall == nil { + fake.getBlockchainInfoReturnsOnCall = make(map[int]struct { + result1 *common.BlockchainInfo + result2 error + }) + } + fake.getBlockchainInfoReturnsOnCall[i] = struct { + result1 *common.BlockchainInfo + result2 error + }{result1, result2} +} + +func (fake *FileLedgerBlockStore) RetrieveBlocks(arg1 uint64) (ledger.ResultsIterator, error) { + fake.retrieveBlocksMutex.Lock() + ret, specificReturn := fake.retrieveBlocksReturnsOnCall[len(fake.retrieveBlocksArgsForCall)] + fake.retrieveBlocksArgsForCall = append(fake.retrieveBlocksArgsForCall, struct { + arg1 uint64 + }{arg1}) + fake.recordInvocation("RetrieveBlocks", []interface{}{arg1}) + fake.retrieveBlocksMutex.Unlock() + if fake.RetrieveBlocksStub != nil { + return fake.RetrieveBlocksStub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + fakeReturns := fake.retrieveBlocksReturns + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FileLedgerBlockStore) RetrieveBlocksCallCount() int { + fake.retrieveBlocksMutex.RLock() + defer fake.retrieveBlocksMutex.RUnlock() + return len(fake.retrieveBlocksArgsForCall) +} + +func (fake *FileLedgerBlockStore) RetrieveBlocksCalls(stub func(uint64) (ledger.ResultsIterator, error)) { + fake.retrieveBlocksMutex.Lock() + defer fake.retrieveBlocksMutex.Unlock() + fake.RetrieveBlocksStub = stub +} + +func (fake *FileLedgerBlockStore) RetrieveBlocksArgsForCall(i int) uint64 { + fake.retrieveBlocksMutex.RLock() + defer fake.retrieveBlocksMutex.RUnlock() + argsForCall := fake.retrieveBlocksArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FileLedgerBlockStore) RetrieveBlocksReturns(result1 ledger.ResultsIterator, result2 error) { + fake.retrieveBlocksMutex.Lock() + defer fake.retrieveBlocksMutex.Unlock() + fake.RetrieveBlocksStub = nil + fake.retrieveBlocksReturns = struct { + result1 ledger.ResultsIterator + result2 error + }{result1, result2} +} + +func (fake *FileLedgerBlockStore) RetrieveBlocksReturnsOnCall(i int, result1 ledger.ResultsIterator, result2 error) { + fake.retrieveBlocksMutex.Lock() + defer fake.retrieveBlocksMutex.Unlock() + fake.RetrieveBlocksStub = nil + if fake.retrieveBlocksReturnsOnCall == nil { + fake.retrieveBlocksReturnsOnCall = make(map[int]struct { + result1 ledger.ResultsIterator + result2 error + }) + } + fake.retrieveBlocksReturnsOnCall[i] = struct { + result1 ledger.ResultsIterator + result2 error + }{result1, result2} +} + +func (fake *FileLedgerBlockStore) Shutdown() { + fake.shutdownMutex.Lock() + fake.shutdownArgsForCall = append(fake.shutdownArgsForCall, struct { + }{}) + fake.recordInvocation("Shutdown", []interface{}{}) + fake.shutdownMutex.Unlock() + if fake.ShutdownStub != nil { + fake.ShutdownStub() + } +} + +func (fake *FileLedgerBlockStore) ShutdownCallCount() int { + fake.shutdownMutex.RLock() + defer fake.shutdownMutex.RUnlock() + return len(fake.shutdownArgsForCall) +} + +func (fake *FileLedgerBlockStore) ShutdownCalls(stub func()) { + fake.shutdownMutex.Lock() + defer fake.shutdownMutex.Unlock() + fake.ShutdownStub = stub +} + +func (fake *FileLedgerBlockStore) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.addBlockMutex.RLock() + defer fake.addBlockMutex.RUnlock() + fake.getBlockchainInfoMutex.RLock() + defer fake.getBlockchainInfoMutex.RUnlock() + fake.retrieveBlocksMutex.RLock() + defer fake.retrieveBlocksMutex.RUnlock() + fake.shutdownMutex.RLock() + defer fake.shutdownMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FileLedgerBlockStore) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/core/peer/peer.go b/core/peer/peer.go index 9eb44e4567d..528ccb3f714 100644 --- a/core/peer/peer.go +++ b/core/peer/peer.go @@ -148,6 +148,8 @@ func (flbs fileLedgerBlockStore) RetrieveBlocks(startBlockNumber uint64) (common return flbs.GetBlocksIterator(startBlockNumber) } +func (flbs fileLedgerBlockStore) Shutdown() {} + // NewConfigSupport returns func NewConfigSupport(peer *Peer) cc.Manager { return &configSupport{