Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FAB-17800] Reset/Rollback returns error if a channel was bootstrapped from a snapshot #1990

Merged
merged 1 commit into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions common/ledger/blkstorage/blockfile_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/internal/fileutil"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -184,3 +185,36 @@ func loadBootstrappingSnapshotInfo(rootDir string) (*BootstrappingSnapshotInfo,
}
return bsi, nil
}

func IsBootstrappedFromSnapshot(blockStorageDir, ledgerID string) (bool, error) {
ledgerDir := filepath.Join(blockStorageDir, ChainsDir, ledgerID)
_, err := os.Stat(filepath.Join(ledgerDir, bootstrappingSnapshotInfoFile))
if os.IsNotExist(err) {
return false, nil
}
if err != nil {
return false, errors.Wrapf(err, "failed to read bootstrappingSnapshotInfo file under blockstore directory %s", ledgerDir)
}
return true, nil
}

func GetLedgersBootstrappedFromSnapshot(blockStorageDir string) ([]string, error) {
chainsDir := filepath.Join(blockStorageDir, ChainsDir)
ledgerIDs, err := fileutil.ListSubdirs(chainsDir)
if err != nil {
return nil, err
}

isFromSnapshot := false
ledgersFromSnapshot := []string{}
for _, ledgerID := range ledgerIDs {
if isFromSnapshot, err = IsBootstrappedFromSnapshot(blockStorageDir, ledgerID); err != nil {
return nil, err
}
if isFromSnapshot {
ledgersFromSnapshot = append(ledgersFromSnapshot, ledgerID)
}
}

return ledgersFromSnapshot, nil
}
69 changes: 69 additions & 0 deletions common/ledger/blkstorage/blockfile_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ SPDX-License-Identifier: Apache-2.0
package blkstorage

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -114,6 +116,73 @@ func TestBinarySearchBlockFileNum(t *testing.T) {
}
}

func TestIsBootstrappedFromSnapshot(t *testing.T) {
testDir, err := ioutil.TempDir("", "isbootstrappedfromsnapshot")
require.NoError(t, err)
defer os.RemoveAll(testDir)

t.Run("no_bootstrapping_snapshot_info_file", func(t *testing.T) {
// create chains directory for the ledger without bootstrappingSnapshotInfoFile
ledgerid := "testnosnapshotinfofile"
require.NoError(t, os.MkdirAll(filepath.Join(testDir, ChainsDir, ledgerid), 0755))
isFromSnapshot, err := IsBootstrappedFromSnapshot(testDir, ledgerid)
require.NoError(t, err)
require.False(t, isFromSnapshot)
})

t.Run("with_bootstrapping_snapshot_info_file", func(t *testing.T) {
// create chains directory for the ledger with bootstrappingSnapshotInfoFile
ledgerid := "testwithsnapshotinfofile"
ledgerChainDir := filepath.Join(testDir, ChainsDir, ledgerid)
require.NoError(t, os.MkdirAll(ledgerChainDir, 0755))
file, err := os.Create(filepath.Join(ledgerChainDir, bootstrappingSnapshotInfoFile))
require.NoError(t, err)
defer file.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to remove "defer"

isFromSnapshot, err := IsBootstrappedFromSnapshot(testDir, ledgerid)
require.NoError(t, err)
require.True(t, isFromSnapshot)
})
}

func TestGetLedgersBootstrappedFromSnapshot(t *testing.T) {
t.Run("no_bootstrapping_snapshot_info_file", func(t *testing.T) {
testDir, err := ioutil.TempDir("", "getledgersfromsnapshot_nosnapshot_info")
require.NoError(t, err)
defer os.RemoveAll(testDir)

// create chains directories for ledgers without bootstrappingSnapshotInfoFile
for i := 0; i < 5; i++ {
require.NoError(t, os.MkdirAll(filepath.Join(testDir, ChainsDir, fmt.Sprintf("ledger_%d", i)), 0755))
}

ledgersFromSnapshot, err := GetLedgersBootstrappedFromSnapshot(testDir)
require.NoError(t, err)
require.Equal(t, 0, len(ledgersFromSnapshot))
})

t.Run("with_bootstrapping_snapshot_info_file", func(t *testing.T) {
testDir, err := ioutil.TempDir("", "getledgersfromsnapshot_snapshot_info")
require.NoError(t, err)
defer os.RemoveAll(testDir)

// create chains directories for ledgers
// also create bootstrappingSnapshotInfoFile for ledger_0 and ledger_1
for i := 0; i < 5; i++ {
ledgerChainDir := filepath.Join(testDir, ChainsDir, fmt.Sprintf("ledger_%d", i))
require.NoError(t, os.MkdirAll(ledgerChainDir, 0755))
if i < 2 {
file, err := os.Create(filepath.Join(ledgerChainDir, bootstrappingSnapshotInfoFile))
require.NoError(t, err)
defer file.Close()
}
}

ledgersFromSnapshot, err := GetLedgersBootstrappedFromSnapshot(testDir)
require.NoError(t, err)
require.ElementsMatch(t, ledgersFromSnapshot, []string{"ledger_0", "ledger_1"})
})
}

func checkBlockfilesInfoFromFS(t *testing.T, blkStoreDir string, expected *blockfilesInfo) {
blkfilesInfo, err := constructBlockfilesInfo(blkStoreDir)
require.NoError(t, err)
Expand Down
9 changes: 9 additions & 0 deletions core/ledger/kvledger/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ func ResetAllKVLedgers(rootFSPath string) error {
}
defer fileLock.Unlock()

blockstorePath := BlockStorePath(rootFSPath)
ledgerIDs, err := blkstorage.GetLedgersBootstrappedFromSnapshot(blockstorePath)
if err != nil {
return err
}
if len(ledgerIDs) > 0 {
return errors.Errorf("cannot reset channels because at least one channel was bootstrapped from a snapshot: %+v", ledgerIDs)
}

logger.Info("Resetting all channel ledgers to genesis block")
logger.Infof("Ledger data folder from config = [%s]", rootFSPath)
if err := dropDBs(rootFSPath); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions core/ledger/kvledger/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ func RollbackKVLedger(rootFSPath, ledgerID string, blockNum uint64) error {
defer fileLock.Unlock()

blockstorePath := BlockStorePath(rootFSPath)
isFromSnapshot, err := blkstorage.IsBootstrappedFromSnapshot(blockstorePath, ledgerID)
if err != nil {
return err
}
if isFromSnapshot {
return errors.Errorf("cannot rollback channel [%s] because it was bootstrapped from a snapshot", ledgerID)
}

if err := blkstorage.ValidateRollbackParams(blockstorePath, ledgerID, blockNum); err != nil {
return err
}
Expand Down
56 changes: 56 additions & 0 deletions core/ledger/kvledger/tests/reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package tests
import (
"fmt"
"testing"
"time"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
Expand Down Expand Up @@ -217,3 +218,58 @@ func TestResetLedgerWithoutDroppingDBs(t *testing.T) {
"This is possible when the state database is not dropped after a ledger reset/rollback. "+
"The state database can safely be dropped and will be rebuilt up to block store height upon the next peer start")
}

func TestResetFailIfAnyLedgerBootstrappedFromSnapshot(t *testing.T) {
env := newEnv(t)
defer env.cleanup()
env.initLedgerMgmt()

// populate ledgers with sample data
ledgerID := "testLedgerFromSnapshot"
dataHelper := newSampleDataHelper(t)
h := env.newTestHelperCreateLgr(ledgerID, t)
dataHelper.populateLedger(h)
dataHelper.verifyLedgerContent(h)
bcInfo, err := h.lgr.GetBlockchainInfo()
require.NoError(t, err)

// create a sanapshot
blockNum := bcInfo.Height - 1
require.NoError(t, h.lgr.SubmitSnapshotRequest(blockNum))
// wait until snapshot is generated
snapshotGenerated := func() bool {
requests, err := h.lgr.PendingSnapshotRequests()
require.NoError(t, err)
return len(requests) == 0
}
require.Eventually(t, snapshotGenerated, time.Minute, 100*time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100ms seem unnecessarily long to me...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sample ledger has 9 blocks. When running UT locally, there were the following timestamps, so the elapsed time was 337-202=135ms.

2020-10-09 16:21:59.202 EDT [kvledger] func2 -> INFO 1dd^[[0m Generating snapshot channelID=testLedgerFromSnapshot lastCommittedBlockNumber=8

2020-10-09 16:21:59.337 EDT [kvledger] func2 -> INFO 200^[[0m Generated snapshot channelID=testLedgerFromSnapshot lastCommittedBlockNumber=8

snapshotDir := kvledger.SnapshotDirForLedgerBlockNum(env.initializer.Config.SnapshotsConfig.RootDir, ledgerID, blockNum)
env.closeLedgerMgmt()

// creates a new env with multiple ledgers, some from genesis block and some from a snapshot
env2 := newEnv(t)
defer env2.cleanup()
env2.initLedgerMgmt()

for i := 0; i < 2; i++ {
ledgerID := fmt.Sprintf("ledger-%d", i)
env2.newTestHelperCreateLgr(ledgerID, t)
}

callbackCounter := 0
callback := func(l ledger.PeerLedger, cid string) { callbackCounter++ }
require.NoError(t, env2.ledgerMgr.CreateLedgerFromSnapshot(snapshotDir, callback))

// wait until ledger creation is done
ledgerCreated := func() bool {
status := env2.ledgerMgr.JoinBySnapshotStatus()
return !status.InProgress && status.BootstrappingSnapshotDir == ""
}
require.Eventually(t, ledgerCreated, time.Minute, 100*time.Microsecond)
require.Equal(t, 1, callbackCounter)
env2.closeLedgerMgmt()

wenjianqiao marked this conversation as resolved.
Show resolved Hide resolved
// reset should fail
err = kvledger.ResetAllKVLedgers(env2.initializer.Config.RootFSPath)
require.EqualError(t, err, "cannot reset channels because at least one channel was bootstrapped from a snapshot: [testLedgerFromSnapshot]")
}
51 changes: 51 additions & 0 deletions core/ledger/kvledger/tests/rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package tests
import (
"fmt"
"testing"
"time"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger"
Expand Down Expand Up @@ -159,3 +160,53 @@ func TestRollbackKVLedgerWithBTL(t *testing.T) {
r.pvtdataShouldNotContain("cc1", "coll2") // <cc1, coll2> shold have been purged from the pvtdata storage
})
}

func TestRollbackFailIfLedgerBootstrappedFromSnapshot(t *testing.T) {
env := newEnv(t)
defer env.cleanup()
env.initLedgerMgmt()

// populate ledgers with sample data
ledgerID := "testLedgerFromSnapshot"
dataHelper := newSampleDataHelper(t)
h := env.newTestHelperCreateLgr(ledgerID, t)
dataHelper.populateLedger(h)
dataHelper.verifyLedgerContent(h)
bcInfo, err := h.lgr.GetBlockchainInfo()
require.NoError(t, err)

// create a sanapshot
blockNum := bcInfo.Height - 1
require.NoError(t, h.lgr.SubmitSnapshotRequest(blockNum))
// wait until snapshot is generated
snapshotGenerated := func() bool {
requests, err := h.lgr.PendingSnapshotRequests()
require.NoError(t, err)
return len(requests) == 0
}
require.Eventually(t, snapshotGenerated, time.Minute, 100*time.Millisecond)
snapshotDir := kvledger.SnapshotDirForLedgerBlockNum(env.initializer.Config.SnapshotsConfig.RootDir, ledgerID, blockNum)
env.closeLedgerMgmt()

// bootstrap a ledger from the snapshot
env2 := newEnv(t)
defer env2.cleanup()
env2.initLedgerMgmt()

callbackCounter := 0
callback := func(l ledger.PeerLedger, cid string) { callbackCounter++ }
require.NoError(t, env2.ledgerMgr.CreateLedgerFromSnapshot(snapshotDir, callback))

// wait until ledger creation is done
ledgerCreated := func() bool {
status := env2.ledgerMgr.JoinBySnapshotStatus()
return !status.InProgress && status.BootstrappingSnapshotDir == ""
}
require.Eventually(t, ledgerCreated, time.Minute, 100*time.Millisecond)
require.Equal(t, 1, callbackCounter)
env2.closeLedgerMgmt()

// rollback the ledger should fail
err = kvledger.RollbackKVLedger(env2.initializer.Config.RootFSPath, ledgerID, 1)
require.EqualError(t, err, "cannot rollback channel [testLedgerFromSnapshot] because it was bootstrapped from a snapshot")
}
23 changes: 2 additions & 21 deletions internal/peer/node/reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,9 @@ SPDX-License-Identifier: Apache-2.0
package node

import (
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"

"github.com/hyperledger/fabric/core/config"
"github.com/hyperledger/fabric/internal/fileutil"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
)
Expand All @@ -25,21 +20,7 @@ func TestResetCmd(t *testing.T) {
viper.Set("peer.fileSystemPath", testPath)
defer os.RemoveAll(testPath)

viper.Set("logging.ledger", "INFO")
rootFSPath := filepath.Join(config.GetPath("peer.fileSystemPath"), "ledgersData")
historyDBPath := filepath.Join(rootFSPath, "historyLeveldb")
require.NoError(t,
os.MkdirAll(historyDBPath, 0755),
)
require.NoError(t,
ioutil.WriteFile(path.Join(historyDBPath, "dummyfile.txt"), []byte("this is a dummy file for test"), 0644),
)
cmd := resetCmd()

_, err := os.Stat(historyDBPath)
require.False(t, os.IsNotExist(err))
require.NoError(t, cmd.Execute())
empty, err := fileutil.DirEmpty(historyDBPath)
require.NoError(t, err)
require.True(t, empty)
err := cmd.Execute()
require.Contains(t, err.Error(), "open /tmp/hyperledger/test/ledgersData/chains/chains: no such file or directory")
}