From 21f5856ddfd7d701fb830ac7982b6e83fd649244 Mon Sep 17 00:00:00 2001 From: Tiffany Harris Date: Wed, 30 Sep 2020 15:53:36 -0400 Subject: [PATCH] [FAB-18082] Make RemoveChannel crash fault tolerant Signed-off-by: Tiffany Harris --- .../ledger/blockledger/fileledger/factory.go | 38 +++++- .../blockledger/fileledger/factory_test.go | 128 +++++++++++++++++- orderer/common/filerepo/filerepo.go | 2 +- orderer/common/filerepo/filerepo_test.go | 5 +- 4 files changed, 161 insertions(+), 12 deletions(-) diff --git a/common/ledger/blockledger/fileledger/factory.go b/common/ledger/blockledger/fileledger/factory.go index 6815f1351b5..31ec1f185fc 100644 --- a/common/ledger/blockledger/fileledger/factory.go +++ b/common/ledger/blockledger/fileledger/factory.go @@ -7,11 +7,14 @@ SPDX-License-Identifier: Apache-2.0 package fileledger import ( + "os" + "path/filepath" "sync" "github.com/hyperledger/fabric/common/ledger/blkstorage" "github.com/hyperledger/fabric/common/ledger/blockledger" "github.com/hyperledger/fabric/common/metrics" + "github.com/hyperledger/fabric/orderer/common/filerepo" ) //go:generate counterfeiter -o mock/block_store_provider.go --fake-name BlockStoreProvider . blockStoreProvider @@ -26,6 +29,7 @@ type fileLedgerFactory struct { blkstorageProvider blockStoreProvider ledgers map[string]*FileLedger mutex sync.Mutex + removeFileRepo *filerepo.Repo } // GetOrCreate gets an existing ledger (if it exists) or creates it @@ -62,6 +66,10 @@ func (f *fileLedgerFactory) Remove(channelID string) error { ledger.blockStore.Shutdown() } + if err := f.removeFileRepo.Save(channelID, []byte{}); err != nil && err != os.ErrExist { + return err + } + err := f.blkstorageProvider.Drop(channelID) if err != nil { return err @@ -69,6 +77,10 @@ func (f *fileLedgerFactory) Remove(channelID string) error { delete(f.ledgers, channelID) + if err := f.removeFileRepo.Remove(channelID); err != nil { + return err + } + return nil } @@ -97,8 +109,30 @@ func New(directory string, metricsProvider metrics.Provider) (blockledger.Factor if err != nil { return nil, err } - return &fileLedgerFactory{ + + fileRepo, err := filerepo.New(filepath.Join(directory, "filerepo"), "remove") + if err != nil { + return nil, err + } + + factory := &fileLedgerFactory{ blkstorageProvider: p, ledgers: map[string]*FileLedger{}, - }, nil + removeFileRepo: fileRepo, + } + + files, err := factory.removeFileRepo.List() + if len(files) != 0 { + for _, fileName := range files { + channelID := factory.removeFileRepo.FileToBaseName(fileName) + err = factory.Remove(channelID) + if err != nil { + logger.Errorf("Failed to remove channel %s: %s", channelID, err.Error()) + return nil, err + } + logger.Infof("Removed channel: %s", channelID) + } + } + + return factory, nil } diff --git a/common/ledger/blockledger/fileledger/factory_test.go b/common/ledger/blockledger/fileledger/factory_test.go index 409d4954621..d1efe85dd6d 100644 --- a/common/ledger/blockledger/fileledger/factory_test.go +++ b/common/ledger/blockledger/fileledger/factory_test.go @@ -8,12 +8,15 @@ package fileledger import ( "errors" + "fmt" "io/ioutil" "os" + "path/filepath" "testing" "github.com/hyperledger/fabric/common/ledger/blockledger/fileledger/mock" "github.com/hyperledger/fabric/common/metrics/disabled" + "github.com/hyperledger/fabric/orderer/common/filerepo" "github.com/stretchr/testify/require" ) @@ -24,17 +27,19 @@ type fileLedgerBlockStore interface { } func TestBlockStoreProviderErrors(t *testing.T) { - setup := func() (*fileLedgerFactory, *mock.BlockStoreProvider) { + setup := func(fileRepo *filerepo.Repo) (*fileLedgerFactory, *mock.BlockStoreProvider) { m := &mock.BlockStoreProvider{} + f := &fileLedgerFactory{ blkstorageProvider: m, ledgers: map[string]*FileLedger{}, + removeFileRepo: fileRepo, } return f, m } t.Run("list", func(t *testing.T) { - f, mockBlockStoreProvider := setup() + f, mockBlockStoreProvider := setup(nil) mockBlockStoreProvider.ListReturns(nil, errors.New("boogie")) require.PanicsWithValue( t, @@ -45,7 +50,7 @@ func TestBlockStoreProviderErrors(t *testing.T) { }) t.Run("open", func(t *testing.T) { - f, mockBlockStoreProvider := setup() + f, mockBlockStoreProvider := setup(nil) mockBlockStoreProvider.OpenReturns(nil, errors.New("woogie")) _, err := f.GetOrCreate("foo") require.EqualError(t, err, "woogie") @@ -53,8 +58,14 @@ func TestBlockStoreProviderErrors(t *testing.T) { }) t.Run("remove", func(t *testing.T) { + dir, err := ioutil.TempDir("", "fileledger") + require.NoError(t, err, "Error creating temp dir: %s", err) + defer os.RemoveAll(dir) + fileRepo, err := filerepo.New(filepath.Join(dir, "filerepo"), "remove") + require.NoError(t, err, "Error creating temp file repo: %s", err) + t.Run("ledger doesn't exist", func(t *testing.T) { - f, mockBlockStoreProvider := setup() + f, mockBlockStoreProvider := setup(fileRepo) err := f.Remove("foo") require.NoError(t, err) require.Equal(t, 1, mockBlockStoreProvider.DropCallCount()) @@ -63,7 +74,7 @@ func TestBlockStoreProviderErrors(t *testing.T) { }) t.Run("dropping the blockstore fails", func(t *testing.T) { - f, mockBlockStoreProvider := setup() + f, mockBlockStoreProvider := setup(fileRepo) mockBlockStore := &mock.FileLedgerBlockStore{} f.ledgers["bar"] = &FileLedger{blockStore: mockBlockStore} mockBlockStoreProvider.DropReturns(errors.New("oogie")) @@ -106,13 +117,120 @@ func TestMultiReinitialization(t *testing.T) { require.Equal(t, 3, len(f.ChannelIDs()), "Expected channel to be recovered") f.Close() + bar2FileRepoDir := filepath.Join(dir, "filerepo", "remove", "bar2.remove") + _, err = os.Create(bar2FileRepoDir) + require.NoError(t, err, "Error creating temp file: %s", err) + + bar2ChainsDir := filepath.Join(dir, "chains", "bar2") + err = os.MkdirAll(bar2ChainsDir, 0700) + require.NoError(t, err, "Error creating temp dir: %s", err) + _, err = os.Create(filepath.Join(bar2ChainsDir, "blockfile_000000")) + require.NoError(t, err, "Error creating temp file: %s", err) + f, err = New(dir, metricsProvider) require.NoError(t, err) + err = f.Remove("bar") require.NoError(t, err, "Error removing channel") require.Equal(t, 2, len(f.ChannelIDs())) err = f.Remove("this-isnt-an-existing-channel") require.NoError(t, err, "Error removing channel") require.Equal(t, 2, len(f.ChannelIDs())) + + _, err = os.Stat(bar2ChainsDir) + require.EqualError(t, err, fmt.Sprintf("stat %s: no such file or directory", bar2ChainsDir)) + + _, err = os.Stat(bar2FileRepoDir) + require.EqualError(t, err, fmt.Sprintf("stat %s: no such file or directory", bar2FileRepoDir)) f.Close() } + +func TestNewErrors(t *testing.T) { + metricsProvider := &disabled.Provider{} + + t.Run("creation of filerepo fails", func(t *testing.T) { + dir, err := ioutil.TempDir("", "fileledger") + require.NoError(t, err, "Error creating temp dir: %s", err) + defer os.RemoveAll(dir) + + fileRepoDir := filepath.Join(dir, "filerepo", "remove") + err = os.MkdirAll(fileRepoDir, 0700) + require.NoError(t, err, "Error creating temp dir: %s", err) + removeFile := filepath.Join(fileRepoDir, "rojo.remove") + _, err = os.Create(removeFile) + require.NoError(t, err, "Error creating temp file: %s", err) + err = os.Chmod(removeFile, 0444) + err = os.Chmod(filepath.Join(dir, "filerepo", "remove"), 0444) + require.NoError(t, err, "Error changing permissions of temp file: %s", err) + + _, err = New(dir, metricsProvider) + require.EqualError(t, err, fmt.Sprintf("error checking if dir [%s] is empty: lstat %s: permission denied", fileRepoDir, removeFile)) + }) + + t.Run("removal fails", func(t *testing.T) { + dir, err := ioutil.TempDir("", "fileledger") + require.NoError(t, err, "Error creating temp dir: %s", err) + defer os.RemoveAll(dir) + + fileRepoDir := filepath.Join(dir, "filerepo", "remove") + err = os.MkdirAll(fileRepoDir, 0777) + require.NoError(t, err, "Error creating temp dir: %s", err) + removeFile := filepath.Join(fileRepoDir, "rojo.remove") + _, err = os.Create(removeFile) + require.NoError(t, err, "Error creating temp file: %s", err) + err = os.Chmod(removeFile, 0444) + err = os.Chmod(filepath.Join(dir, "filerepo", "remove"), 0544) + require.NoError(t, err, "Error changing permissions of temp file: %s", err) + + _, err = New(dir, metricsProvider) + require.EqualError(t, err, fmt.Sprintf("unlinkat %s: permission denied", removeFile)) + }) +} + +func TestRemove(t *testing.T) { + mockBlockStore := &mock.BlockStoreProvider{} + dir, err := ioutil.TempDir("", "fileledger") + require.NoError(t, err, "Error creating temp dir: %s", err) + defer os.RemoveAll(dir) + + fileRepo, err := filerepo.New(filepath.Join(dir, "filerepo"), "remove") + require.NoError(t, err, "Error creating temp file repo: %s", err) + f := &fileLedgerFactory{ + blkstorageProvider: mockBlockStore, + ledgers: map[string]*FileLedger{}, + removeFileRepo: fileRepo, + } + defer f.Close() + + t.Run("success", func(t *testing.T) { + dest := filepath.Join(dir, "filerepo", "remove", "foo.remove") + mockBlockStore.DropCalls(func(string) error { + _, err = os.Stat(dest) + require.NoError(t, err, "Expected foo.remove to exist") + return nil + }) + err = f.Remove("foo") + require.NoError(t, err, "Error removing channel") + require.Equal(t, 1, mockBlockStore.DropCallCount(), "Expected 1 Drop() calls") + + _, err = os.Stat(dest) + require.EqualError(t, err, fmt.Sprintf("stat %s: no such file or directory", dest)) + }) + + t.Run("drop fails", func(t *testing.T) { + mockBlockStore.DropReturns(errors.New("oogie")) + err = f.Remove("foo") + require.EqualError(t, err, "oogie") + + dest := filepath.Join(dir, "filerepo", "remove", "foo.remove") + _, err = os.Stat(dest) + require.NoError(t, err, "Expected foo.remove to exist") + }) + + t.Run("saving to file repo fails", func(t *testing.T) { + os.RemoveAll(dir) + mockBlockStore.DropReturns(nil) + err = f.Remove("foo") + require.EqualError(t, err, fmt.Sprintf("error while creating file:%s/filerepo/remove/foo.remove~: open %s/filerepo/remove/foo.remove~: no such file or directory", dir, dir)) + }) +} diff --git a/orderer/common/filerepo/filerepo.go b/orderer/common/filerepo/filerepo.go index 260a9302cda..f16b361f021 100644 --- a/orderer/common/filerepo/filerepo.go +++ b/orderer/common/filerepo/filerepo.go @@ -90,7 +90,7 @@ func (r *Repo) Save(baseName string, content []byte) error { dest := r.baseToFilePath(baseName) if _, err := os.Stat(dest); err == nil { - return errors.Errorf("file already exists at %s", dest) + return os.ErrExist } tmpFileName := fileName + r.transientFileMarker diff --git a/orderer/common/filerepo/filerepo_test.go b/orderer/common/filerepo/filerepo_test.go index 1f49024d323..b1831fd9e79 100644 --- a/orderer/common/filerepo/filerepo_test.go +++ b/orderer/common/filerepo/filerepo_test.go @@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0 package filerepo_test import ( - "fmt" "io/ioutil" "os" "path/filepath" @@ -113,10 +112,8 @@ func TestFileRepo_SaveFailure(t *testing.T) { r, err := filerepo.New("testdata", "joinblock") require.NoError(t, err) - filePath := filepath.Join("testdata", "joinblock", "mychannel.joinblock") - err = r.Save("mychannel", []byte{}) - require.EqualError(t, err, fmt.Sprintf("file already exists at %s", filePath)) + require.EqualError(t, err, os.ErrExist.Error()) } func TestFileRepo_Remove(t *testing.T) {