Skip to content

Commit

Permalink
Merge "fix pvt and blockStore recovery for reset/rollback" into relea…
Browse files Browse the repository at this point in the history
…se-1.4
  • Loading branch information
denyeart authored and Gerrit Code Review committed Aug 8, 2019
2 parents ea7cd9c + 861847b commit cbf6372
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 41 deletions.
38 changes: 10 additions & 28 deletions core/ledger/ledgerstorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
"github.com/hyperledger/fabric/protos/common"
"github.com/pkg/errors"
)

var logger = flogging.MustGetLogger("ledgerstorage")
Expand Down Expand Up @@ -287,7 +286,7 @@ func (s *Store) init() error {
if initialized, err = s.initPvtdataStoreFromExistingBlockchain(); err != nil || initialized {
return err
}
return s.syncPvtdataStoreWithBlockStore()
return s.commitPendingBatchInPvtdataStore()
}

// initPvtdataStoreFromExistingBlockchain updates the initial state of the pvtdata store
Expand Down Expand Up @@ -317,14 +316,10 @@ func (s *Store) initPvtdataStoreFromExistingBlockchain() (bool, error) {
return false, nil
}

// syncPvtdataStoreWithBlockStore checks whether the block storage and pvt data store are in sync
// this is called when the store instance is constructed and handed over for the use.
// this check whether there is a pending batch (possibly from a previous system crash)
// of pvt data that was not committed. If a pending batch exists, the check is made
// whether the associated block was successfully committed in the block storage (before the crash)
// or not. If the block was committed, the private data batch is committed
// otherwise, the pvt data batch is rolledback
func (s *Store) syncPvtdataStoreWithBlockStore() error {
// commitPendingBatchInPvtdataStore checks whether there is a pending batch
// (possibly from a previous system crash) of pvt data that was not committed.
// If a pending batch exists, the batch is committed.
func (s *Store) commitPendingBatchInPvtdataStore() error {
var pendingPvtbatch bool
var err error
if pendingPvtbatch, err = s.pvtdataStore.HasPendingBatch(); err != nil {
Expand All @@ -333,25 +328,12 @@ func (s *Store) syncPvtdataStoreWithBlockStore() error {
if !pendingPvtbatch {
return nil
}
var bcInfo *common.BlockchainInfo
var pvtdataStoreHt uint64

if bcInfo, err = s.GetBlockchainInfo(); err != nil {
return err
}
if pvtdataStoreHt, err = s.pvtdataStore.LastCommittedBlockHeight(); err != nil {
return err
}

if bcInfo.Height == pvtdataStoreHt {
return s.pvtdataStore.Rollback()
}

if bcInfo.Height == pvtdataStoreHt+1 {
return s.pvtdataStore.Commit()
}

return errors.Errorf("This is not expected. blockStoreHeight=%d, pvtdataStoreHeight=%d", bcInfo.Height, pvtdataStoreHt)
// we can safetly commit the pending batch as gossip would avoid
// fetching pvtData if already exist in the local pvtdataStore.
// when the pvtdataStore height is greater than the blockstore,
// pvtdata reconciler will not fetch any missing pvtData.
return s.pvtdataStore.Commit()
}

func constructPvtdataMap(pvtdata []*ledger.TxPvtData) map[uint64]*ledger.TxPvtData {
Expand Down
121 changes: 108 additions & 13 deletions core/ledger/ledgerstorage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil"
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
lutil "github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset"
Expand Down Expand Up @@ -192,29 +191,72 @@ func TestCrashAfterPvtdataStorePreparation(t *testing.T) {
store.pvtdataStore.Prepare(blokNumAtCrash, pvtdataAtCrash, nil)
store.Shutdown()
provider.Close()

// restart the store
provider = NewProvider(metricsProvider)
store, err = provider.Open("testLedger")
assert.NoError(t, err)
store.Init(btlPolicyForSampleData())

// When starting the storage after a crash, there should not be a trace of last block write
_, err = store.GetPvtDataByNum(blokNumAtCrash, nil)
_, ok := err.(*pvtdatastorage.ErrOutOfRange)
assert.True(t, ok)
pvtdata, err := store.GetPvtDataByNum(blokNumAtCrash, nil)
assert.NoError(t, err)
constructed := constructPvtdataMap(pvtdata)
testVerifyPvtData(t, dataAtCrash.PvtData, constructed)

//we should be able to write the last block again
assert.NoError(t, store.CommitWithPvtData(dataAtCrash))
blkAndPvtdata, err := store.GetPvtDataAndBlockByNum(blokNumAtCrash, nil)
assert.NoError(t, err)
assert.True(t, proto.Equal(dataAtCrash.Block, blkAndPvtdata.Block))
testVerifyPvtData(t, dataAtCrash.PvtData, constructed)
}

func TestCrashAfterPvtdataStorePreparationWithReset(t *testing.T) {
testEnv := newTestEnv(t)
defer testEnv.cleanup()
provider := NewProvider(metricsProvider)
defer provider.Close()
store, err := provider.Open("testLedger")
store.Init(btlPolicyForSampleData())
defer store.Shutdown()
assert.NoError(t, err)

sampleData := sampleDataWithPvtdataForAllTxs(t)
dataBeforeCrash := sampleData[0:3]
dataAtCrash := sampleData[3]

for _, sampleDatum := range dataBeforeCrash {
assert.NoError(t, store.CommitWithPvtData(sampleDatum))
}
blokNumAtCrash := dataAtCrash.Block.Header.Number
var pvtdataAtCrash []*ledger.TxPvtData
for _, p := range dataAtCrash.PvtData {
pvtdataAtCrash = append(pvtdataAtCrash, p)
}
// Only call Prepare on pvt data store and mimic a crash
store.pvtdataStore.Prepare(blokNumAtCrash, pvtdataAtCrash, nil)
store.Shutdown()
provider.Close()

// reset the block store to the genesis block
fsblkstorage.ResetBlockStore(ledgerconfig.GetBlockStorePath())

// restart the store
provider = NewProvider(metricsProvider)
store, err = provider.Open("testLedger")
assert.NoError(t, err)
store.Init(btlPolicyForSampleData())

pvtdata, err := store.GetPvtDataByNum(blokNumAtCrash, nil)
assert.NoError(t, err)
constructed := constructPvtdataMap(pvtdata)
for k, v := range dataAtCrash.PvtData {
ov, ok := constructed[k]
assert.True(t, ok)
assert.Equal(t, v.SeqInBlock, ov.SeqInBlock)
assert.True(t, proto.Equal(v.WriteSet, ov.WriteSet))
}
for k, v := range constructed {
ov, ok := dataAtCrash.PvtData[k]
testVerifyPvtData(t, dataAtCrash.PvtData, constructed)
}

func testVerifyPvtData(t *testing.T, blkPvtdata ledger.TxPvtDataMap, pvtdata ledger.TxPvtDataMap) {
assert.Equal(t, len(blkPvtdata), len(pvtdata))
for k, v := range blkPvtdata {
ov, ok := pvtdata[k]
assert.True(t, ok)
assert.Equal(t, v.SeqInBlock, ov.SeqInBlock)
assert.True(t, proto.Equal(v.WriteSet, ov.WriteSet))
Expand Down Expand Up @@ -250,14 +292,67 @@ func TestCrashBeforePvtdataStoreCommit(t *testing.T) {
store.BlockStore.AddBlock(dataAtCrash.Block)
store.Shutdown()
provider.Close()

provider = NewProvider(metricsProvider)
store, err = provider.Open("testLedger")
assert.NoError(t, err)
store.Init(btlPolicyForSampleData())

pvtdata, err := store.GetPvtDataByNum(blokNumAtCrash, nil)
assert.NoError(t, err)
constructed := constructPvtdataMap(pvtdata)
testVerifyPvtData(t, dataAtCrash.PvtData, constructed)

// both the block and pvtData should exist
blkAndPvtdata, err := store.GetPvtDataAndBlockByNum(blokNumAtCrash, nil)
assert.NoError(t, err)
assert.Equal(t, dataAtCrash.MissingPvtData, blkAndPvtdata.MissingPvtData)
assert.True(t, proto.Equal(dataAtCrash.Block, blkAndPvtdata.Block))
testVerifyPvtData(t, dataAtCrash.PvtData, blkAndPvtdata.PvtData)
}

func TestCrashBeforePvtdataStoreCommitWithReset(t *testing.T) {
testEnv := newTestEnv(t)
defer testEnv.cleanup()
provider := NewProvider(metricsProvider)
defer provider.Close()
store, err := provider.Open("testLedger")
store.Init(btlPolicyForSampleData())
defer store.Shutdown()
assert.NoError(t, err)

sampleData := sampleDataWithPvtdataForAllTxs(t)
dataBeforeCrash := sampleData[0:3]
dataAtCrash := sampleData[3]

for _, sampleDatum := range dataBeforeCrash {
assert.NoError(t, store.CommitWithPvtData(sampleDatum))
}
blokNumAtCrash := dataAtCrash.Block.Header.Number
var pvtdataAtCrash []*ledger.TxPvtData
for _, p := range dataAtCrash.PvtData {
pvtdataAtCrash = append(pvtdataAtCrash, p)
}

// Mimic a crash just short of calling the final commit on pvtdata store
// After starting the store again, the block and the pvtdata should be available
store.pvtdataStore.Prepare(blokNumAtCrash, pvtdataAtCrash, nil)
store.BlockStore.AddBlock(dataAtCrash.Block)
store.Shutdown()
provider.Close()

// reset the block store to the genesis block
fsblkstorage.ResetBlockStore(ledgerconfig.GetBlockStorePath())

provider = NewProvider(metricsProvider)
store, err = provider.Open("testLedger")
assert.NoError(t, err)
store.Init(btlPolicyForSampleData())

pvtdata, err := store.GetPvtDataByNum(blokNumAtCrash, nil)
assert.NoError(t, err)
constructed := constructPvtdataMap(pvtdata)
testVerifyPvtData(t, dataAtCrash.PvtData, constructed)
}

func TestAddAfterPvtdataStoreError(t *testing.T) {
Expand Down

0 comments on commit cbf6372

Please sign in to comment.