Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FAB-18082] Make RemoveChannel crash fault tolerant #1952

Merged
merged 1 commit into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions common/ledger/blockledger/fileledger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ SPDX-License-Identifier: Apache-2.0
package fileledger

import (
"os"
"path/filepath"
"sync"

"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/orderer/common/filerepo"
)

//go:generate counterfeiter -o mock/block_store_provider.go --fake-name BlockStoreProvider . blockStoreProvider
Expand All @@ -26,6 +29,7 @@ type fileLedgerFactory struct {
blkstorageProvider blockStoreProvider
ledgers map[string]*FileLedger
mutex sync.Mutex
removeFileRepo *filerepo.Repo
}

// GetOrCreate gets an existing ledger (if it exists) or creates it
Expand Down Expand Up @@ -62,13 +66,21 @@ func (f *fileLedgerFactory) Remove(channelID string) error {
ledger.blockStore.Shutdown()
}

if err := f.removeFileRepo.Save(channelID, []byte{}); err != nil && err != os.ErrExist {
return err
}

err := f.blkstorageProvider.Drop(channelID)
if err != nil {
return err
}

delete(f.ledgers, channelID)

if err := f.removeFileRepo.Remove(channelID); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -97,8 +109,30 @@ func New(directory string, metricsProvider metrics.Provider) (blockledger.Factor
if err != nil {
return nil, err
}
return &fileLedgerFactory{

fileRepo, err := filerepo.New(filepath.Join(directory, "filerepo"), "remove")
stephyee marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

factory := &fileLedgerFactory{
blkstorageProvider: p,
ledgers: map[string]*FileLedger{},
}, nil
removeFileRepo: fileRepo,
}

files, err := factory.removeFileRepo.List()
if len(files) != 0 {
for _, fileName := range files {
channelID := factory.removeFileRepo.FileToBaseName(fileName)
err = factory.Remove(channelID)
if err != nil {
logger.Errorf("Failed to remove channel %s: %s", channelID, err.Error())
return nil, err
}
logger.Infof("Removed channel: %s", channelID)
}
}

return factory, nil
}
128 changes: 123 additions & 5 deletions common/ledger/blockledger/fileledger/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ package fileledger

import (
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/hyperledger/fabric/common/ledger/blockledger/fileledger/mock"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/orderer/common/filerepo"
"github.com/stretchr/testify/require"
)

Expand All @@ -24,17 +27,19 @@ type fileLedgerBlockStore interface {
}

func TestBlockStoreProviderErrors(t *testing.T) {
setup := func() (*fileLedgerFactory, *mock.BlockStoreProvider) {
setup := func(fileRepo *filerepo.Repo) (*fileLedgerFactory, *mock.BlockStoreProvider) {
m := &mock.BlockStoreProvider{}

f := &fileLedgerFactory{
blkstorageProvider: m,
ledgers: map[string]*FileLedger{},
removeFileRepo: fileRepo,
}
return f, m
}

t.Run("list", func(t *testing.T) {
f, mockBlockStoreProvider := setup()
f, mockBlockStoreProvider := setup(nil)
mockBlockStoreProvider.ListReturns(nil, errors.New("boogie"))
require.PanicsWithValue(
t,
Expand All @@ -45,16 +50,22 @@ func TestBlockStoreProviderErrors(t *testing.T) {
})

t.Run("open", func(t *testing.T) {
f, mockBlockStoreProvider := setup()
f, mockBlockStoreProvider := setup(nil)
mockBlockStoreProvider.OpenReturns(nil, errors.New("woogie"))
_, err := f.GetOrCreate("foo")
require.EqualError(t, err, "woogie")
require.Empty(t, f.ledgers, "Expected no new ledger is created")
})

t.Run("remove", func(t *testing.T) {
dir, err := ioutil.TempDir("", "fileledger")
require.NoError(t, err, "Error creating temp dir: %s", err)
defer os.RemoveAll(dir)
fileRepo, err := filerepo.New(filepath.Join(dir, "filerepo"), "remove")
require.NoError(t, err, "Error creating temp file repo: %s", err)

t.Run("ledger doesn't exist", func(t *testing.T) {
f, mockBlockStoreProvider := setup()
f, mockBlockStoreProvider := setup(fileRepo)
err := f.Remove("foo")
require.NoError(t, err)
require.Equal(t, 1, mockBlockStoreProvider.DropCallCount())
Expand All @@ -63,7 +74,7 @@ func TestBlockStoreProviderErrors(t *testing.T) {
})

t.Run("dropping the blockstore fails", func(t *testing.T) {
f, mockBlockStoreProvider := setup()
f, mockBlockStoreProvider := setup(fileRepo)
mockBlockStore := &mock.FileLedgerBlockStore{}
f.ledgers["bar"] = &FileLedger{blockStore: mockBlockStore}
mockBlockStoreProvider.DropReturns(errors.New("oogie"))
Expand Down Expand Up @@ -106,13 +117,120 @@ func TestMultiReinitialization(t *testing.T) {
require.Equal(t, 3, len(f.ChannelIDs()), "Expected channel to be recovered")
f.Close()

bar2FileRepoDir := filepath.Join(dir, "filerepo", "remove", "bar2.remove")
_, err = os.Create(bar2FileRepoDir)
require.NoError(t, err, "Error creating temp file: %s", err)

bar2ChainsDir := filepath.Join(dir, "chains", "bar2")
err = os.MkdirAll(bar2ChainsDir, 0700)
require.NoError(t, err, "Error creating temp dir: %s", err)
_, err = os.Create(filepath.Join(bar2ChainsDir, "blockfile_000000"))
require.NoError(t, err, "Error creating temp file: %s", err)

f, err = New(dir, metricsProvider)
require.NoError(t, err)

err = f.Remove("bar")
require.NoError(t, err, "Error removing channel")
require.Equal(t, 2, len(f.ChannelIDs()))
err = f.Remove("this-isnt-an-existing-channel")
require.NoError(t, err, "Error removing channel")
require.Equal(t, 2, len(f.ChannelIDs()))

_, err = os.Stat(bar2ChainsDir)
require.EqualError(t, err, fmt.Sprintf("stat %s: no such file or directory", bar2ChainsDir))

_, err = os.Stat(bar2FileRepoDir)
require.EqualError(t, err, fmt.Sprintf("stat %s: no such file or directory", bar2FileRepoDir))
f.Close()
}

func TestNewErrors(t *testing.T) {
metricsProvider := &disabled.Provider{}

t.Run("creation of filerepo fails", func(t *testing.T) {
dir, err := ioutil.TempDir("", "fileledger")
require.NoError(t, err, "Error creating temp dir: %s", err)
defer os.RemoveAll(dir)

fileRepoDir := filepath.Join(dir, "filerepo", "remove")
err = os.MkdirAll(fileRepoDir, 0700)
require.NoError(t, err, "Error creating temp dir: %s", err)
removeFile := filepath.Join(fileRepoDir, "rojo.remove")
_, err = os.Create(removeFile)
require.NoError(t, err, "Error creating temp file: %s", err)
err = os.Chmod(removeFile, 0444)
err = os.Chmod(filepath.Join(dir, "filerepo", "remove"), 0444)
require.NoError(t, err, "Error changing permissions of temp file: %s", err)

_, err = New(dir, metricsProvider)
require.EqualError(t, err, fmt.Sprintf("error checking if dir [%s] is empty: lstat %s: permission denied", fileRepoDir, removeFile))
})

t.Run("removal fails", func(t *testing.T) {
dir, err := ioutil.TempDir("", "fileledger")
require.NoError(t, err, "Error creating temp dir: %s", err)
defer os.RemoveAll(dir)

fileRepoDir := filepath.Join(dir, "filerepo", "remove")
err = os.MkdirAll(fileRepoDir, 0777)
require.NoError(t, err, "Error creating temp dir: %s", err)
removeFile := filepath.Join(fileRepoDir, "rojo.remove")
_, err = os.Create(removeFile)
require.NoError(t, err, "Error creating temp file: %s", err)
err = os.Chmod(removeFile, 0444)
err = os.Chmod(filepath.Join(dir, "filerepo", "remove"), 0544)
require.NoError(t, err, "Error changing permissions of temp file: %s", err)

_, err = New(dir, metricsProvider)
require.EqualError(t, err, fmt.Sprintf("unlinkat %s: permission denied", removeFile))
})
}

func TestRemove(t *testing.T) {
mockBlockStore := &mock.BlockStoreProvider{}
dir, err := ioutil.TempDir("", "fileledger")
require.NoError(t, err, "Error creating temp dir: %s", err)
defer os.RemoveAll(dir)

fileRepo, err := filerepo.New(filepath.Join(dir, "filerepo"), "remove")
require.NoError(t, err, "Error creating temp file repo: %s", err)
f := &fileLedgerFactory{
blkstorageProvider: mockBlockStore,
ledgers: map[string]*FileLedger{},
removeFileRepo: fileRepo,
}
defer f.Close()

t.Run("success", func(t *testing.T) {
dest := filepath.Join(dir, "filerepo", "remove", "foo.remove")
mockBlockStore.DropCalls(func(string) error {
_, err = os.Stat(dest)
require.NoError(t, err, "Expected foo.remove to exist")
return nil
})
err = f.Remove("foo")
require.NoError(t, err, "Error removing channel")
require.Equal(t, 1, mockBlockStore.DropCallCount(), "Expected 1 Drop() calls")

_, err = os.Stat(dest)
require.EqualError(t, err, fmt.Sprintf("stat %s: no such file or directory", dest))
})

t.Run("drop fails", func(t *testing.T) {
mockBlockStore.DropReturns(errors.New("oogie"))
err = f.Remove("foo")
require.EqualError(t, err, "oogie")

dest := filepath.Join(dir, "filerepo", "remove", "foo.remove")
_, err = os.Stat(dest)
require.NoError(t, err, "Expected foo.remove to exist")
})

t.Run("saving to file repo fails", func(t *testing.T) {
os.RemoveAll(dir)
mockBlockStore.DropReturns(nil)
err = f.Remove("foo")
require.EqualError(t, err, fmt.Sprintf("error while creating file:%s/filerepo/remove/foo.remove~: open %s/filerepo/remove/foo.remove~: no such file or directory", dir, dir))
})
}
2 changes: 1 addition & 1 deletion orderer/common/filerepo/filerepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *Repo) Save(baseName string, content []byte) error {
dest := r.baseToFilePath(baseName)

if _, err := os.Stat(dest); err == nil {
return errors.Errorf("file already exists at %s", dest)
return os.ErrExist
}

tmpFileName := fileName + r.transientFileMarker
Expand Down
5 changes: 1 addition & 4 deletions orderer/common/filerepo/filerepo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
package filerepo_test

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -113,10 +112,8 @@ func TestFileRepo_SaveFailure(t *testing.T) {
r, err := filerepo.New("testdata", "joinblock")
require.NoError(t, err)

filePath := filepath.Join("testdata", "joinblock", "mychannel.joinblock")

err = r.Save("mychannel", []byte{})
require.EqualError(t, err, fmt.Sprintf("file already exists at %s", filePath))
require.EqualError(t, err, os.ErrExist.Error())
}

func TestFileRepo_Remove(t *testing.T) {
Expand Down